You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/02 07:13:17 UTC
svn commit: r749207 [4/12] - in
/incubator/cassandra/src/org/apache/cassandra: loader/ locator/ net/
net/http/ net/io/ net/sink/ procedures/ service/ test/ tools/
Added: incubator/cassandra/src/org/apache/cassandra/net/http/HTMLFormatter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HTMLFormatter.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HTMLFormatter.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HTMLFormatter.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Helper function to write some basic HTML.
+ */
+
+package org.apache.cassandra.net.http;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HTMLFormatter
+{
+ protected StringBuilder sb_ = null;
+ private boolean writeBody_;
+
+ public HTMLFormatter()
+ {
+ sb_ = new StringBuilder();
+ }
+
+ public HTMLFormatter(StringBuilder sb)
+ {
+ sb_ = sb;
+ }
+
+ public void startBody()
+ {
+ startBody(false, "", true, true);
+ }
+
+ public void startBody(boolean writeJSCallback, String jsCallbackFunction, boolean writeCSS, boolean writeBody)
+ {
+ writeBody_ = writeBody;
+
+ sb_.append("<html>\n");
+ if(writeCSS || writeJSCallback)
+ {
+ sb_.append("<head>\n");
+ if(writeJSCallback)
+ addJSCallback(jsCallbackFunction);
+ if(writeCSS)
+ addCSS();
+ sb_.append("</head>\n");
+ }
+
+ if(writeBody)
+ {
+ sb_.append("<body bgcolor=black>\n");
+ }
+ }
+
+ public void endBody()
+ {
+ if(writeBody_)
+ {
+ sb_.append("</body>\n");
+ }
+ sb_.append("</html>\n");
+ }
+
+ public void appendLine(String s)
+ {
+ sb_.append(s);
+ sb_.append("<br>\n");
+ }
+
+ public void append(String s)
+ {
+ sb_.append(s);
+ }
+
+ public void addJScript(String jscript)
+ {
+ append("<script language=\"text/javascript\">\n");
+ append(jscript + "\n");
+ append("</script>\n");
+ }
+
+ public void startTable()
+ {
+ sb_.append("<table>\n");
+ }
+
+ public void addHeaders(String[] sTableHeaders)
+ {
+ sb_.append("<tr style=\"border: 2px solid #333333\" >\n");
+ for (int i = 0; i < sTableHeaders.length; ++i)
+ {
+ sb_.append("<th><div class=\"tmenubar\">");
+ sb_.append("<b>" + sTableHeaders[i] + "</b>");
+ sb_.append("</div></th>\n");
+ }
+ sb_.append("\n</tr>\n\n");
+ }
+
+ public void addHeader(String sTableHeader)
+ {
+ sb_.append("<tr style=\"border: 2px solid #333333\" >\n");
+ sb_.append("<th><div class=\"tmenubar\">");
+ sb_.append("<b>" + sTableHeader + "</b>");
+ sb_.append("</div></th>\n");
+ sb_.append("\n</tr>\n\n");
+ }
+
+ public void startRow()
+ {
+ sb_.append("<tr style=\"border: 2px solid #333333\">\n");
+ }
+
+ public void addCol(String sData)
+ {
+ sb_.append("<td style=\"border: 2px solid #333333\">");
+ sb_.append(sData);
+ sb_.append("</td>");
+ }
+
+ public void endRow()
+ {
+ sb_.append("</tr>\n");
+ }
+
+ public void endTable()
+ {
+ sb_.append("</table>\n");
+ }
+
+ public void addCombobox(Set<String> comboBoxEntries, String htmlElementName)
+ {
+ addCombobox(comboBoxEntries, htmlElementName, -1);
+ }
+
+ public void addCombobox(Set<String> comboBoxEntries, String htmlElementName, int defaultSelected)
+ {
+ sb_.append(" <select name=" + htmlElementName + " size=1>\n");
+ if(defaultSelected == -1)
+ {
+ sb_.append(" <option value=\"\" SELECTED>Select an option \n");
+ }
+
+ int i = 0;
+ for(String colFamName : comboBoxEntries)
+ {
+ if(defaultSelected == i)
+ {
+ sb_.append(" <option value=\"" + colFamName + "\" SELECTED>" + colFamName + "\n");
+ }
+ else
+ {
+ sb_.append(" <option value=\"" + colFamName + "\">" + colFamName + "\n");
+ }
+ }
+ sb_.append(" </select>\n");
+ }
+
+ public void addDivElement(String divId, String value)
+ {
+ sb_.append("<div id = \"" + divId + "\">");
+ if(value != null)
+ sb_.append(value);
+ sb_.append("</div>\n");
+ }
+
+ public void createTable(String[] sTableHeaders, String[][] sTable)
+ {
+ if (sTable == null || sTable.length == 0)
+ return;
+
+ sb_.append("<table style=\"border: 2px solid #333333\">\n");
+
+ sb_.append("<tr style=\"border: 2px solid #333333\">\n");
+ for (int i = 0; i < sTableHeaders.length; ++i)
+ {
+ sb_.append("<td style=\"border: 2px solid #333333\">");
+ sb_.append("<b>" + sTableHeaders[i] + "</b>");
+ sb_.append("</td>\n");
+ }
+ sb_.append("\n</tr>\n\n");
+
+ for (int i = 0; i < sTable.length; ++i)
+ {
+ sb_.append("<tr style=\"border: 2px solid #333333\">\n");
+ for (int j = 0; j < sTable[i].length; ++j)
+ {
+ sb_.append("<td style=\"border: 2px solid #333333\">");
+ sb_.append(sTable[i][j]);
+ sb_.append("</td>\n");
+ }
+ sb_.append("\n</tr>\n\n");
+ }
+ sb_.append("</table>\n");
+ }
+
+ public void addJSCallback(String jsCallbackFunction)
+ {
+ sb_.append("<script type=\"text/javascript\">\n");
+
+ addJSForTabs();
+
+ sb_.append(jsCallbackFunction +"\n");
+ sb_.append("</script>\n");
+ }
+
+ public void addCSS()
+ {
+ sb_.append("<style type=\"text/css\">\n");
+ sb_.append("body\n");
+ sb_.append("{\n");
+ sb_.append(" color:white;\n");
+ sb_.append(" font-family:Arial Unicode MS,Verdana, Arial, Sans-serif;\n");
+ sb_.append(" font-size:10pt;\n");
+ sb_.append("}\n");
+
+ sb_.append(".tmenubar\n");
+ sb_.append("{\n");
+ sb_.append(" background-color:green;\n");
+ sb_.append(" font-family:Verdana, Arial, Sans-serif;\n");
+ sb_.append(" font-size:10pt;\n");
+ sb_.append(" font-weight:bold;\n");
+ sb_.append("}\n");
+
+ sb_.append("th\n");
+ sb_.append("{\n");
+ sb_.append(" color:white;\n");
+ sb_.append("}\n");
+
+ sb_.append("td\n");
+ sb_.append("{\n");
+ sb_.append(" color:white;\n");
+ sb_.append("}\n");
+ sb_.append("a:link {color:#CAF99B;font-size:10pt;font-weight:bold;font-family:Arial Unicode MS,Lucida-grande,Verdana}\n");
+ sb_.append("a:visited {color:red}\n");
+ sb_.append("a:hover{color:yellow;font-size:10pt;font-weight:bold;font-family:Arial Unicode MS,Lucida-grande,Verdana;background-color:green}\n");
+
+ addCSSForTabs();
+
+ sb_.append("</style>\n");
+
+ }
+
+ public void addCSSForTabs()
+ {
+ sb_.append("#header ul {\n");
+ sb_.append(" list-style: none;\n");
+ sb_.append(" padding: 0;\n");
+ sb_.append(" margin: 0;\n");
+ sb_.append(" }\n");
+ sb_.append("\n");
+ sb_.append("#header li {\n");
+ sb_.append(" float: left;\n");
+ sb_.append(" border: 1px solid #bbb;\n");
+ sb_.append(" border-bottom-width: 0;\n");
+ sb_.append(" margin: 0;\n");
+ sb_.append("}\n");
+ sb_.append("\n");
+ sb_.append("#header a {\n");
+ sb_.append(" text-decoration: none;\n");
+ sb_.append(" display: block;\n");
+ sb_.append(" background: #eee;\n");
+ sb_.append(" padding: 0.24em 1em;\n");
+ sb_.append(" color: #00c;\n");
+ sb_.append(" width: 8em;\n");
+ sb_.append(" text-align: center;\n");
+ sb_.append(" }\n");
+ sb_.append("\n");
+ sb_.append("#header a:hover {\n");
+ sb_.append(" background: #ddf;\n");
+ sb_.append("}\n");
+ sb_.append("\n");
+ sb_.append("#header #selected {\n");
+ sb_.append(" border-color: black;\n");
+ sb_.append("}\n");
+ sb_.append("\n");
+ sb_.append("#header #selected a {\n");
+ sb_.append(" position: relative;\n");
+ sb_.append(" top: 1px;\n");
+ sb_.append(" background: white;\n");
+ sb_.append(" color: black;\n");
+ sb_.append(" font-weight: bold;\n");
+ sb_.append("}\n");
+ sb_.append("\n");
+ sb_.append("#content {\n");
+ sb_.append(" border: 1px solid black;\n");
+ sb_.append(" visibility:hidden;\n");
+ sb_.append(" position:absolute;\n");
+ sb_.append(" top:200;\n");
+ sb_.append(" clear: both;\n");
+ sb_.append(" padding: 0 1em;\n");
+ sb_.append("}\n");
+ sb_.append("\n");
+ sb_.append("h1 {\n");
+ sb_.append(" margin: 0;\n");
+ sb_.append(" padding: 0 0 1em 0;\n");
+ sb_.append("}\n");
+ }
+
+ public void addJSForTabs()
+ {
+ sb_.append("var curSelectedDivId = \"one\";\n");
+ sb_.append("\n");
+ sb_.append("function selectTab(tabDivId)\n");
+ sb_.append("{\n");
+ sb_.append(" var x = document.getElementsByName(curSelectedDivId);\n");
+ sb_.append(" if(x[1])\n");
+ sb_.append(" x[1].style.visibility=\"hidden\";\n");
+ sb_.append(" if(x[0])\n");
+ sb_.append(" x[0].id=curSelectedDivId;\n");
+ sb_.append("\n");
+ sb_.append("\n");
+ sb_.append(" var y = document.getElementsByName(tabDivId);\n");
+ sb_.append(" if(y[1])\n");
+ sb_.append(" y[1].style.visibility=\"visible\";\n");
+ sb_.append(" if(y[0])\n");
+ sb_.append(" y[0].id = \"selected\";\n");
+ sb_.append("\n");
+ sb_.append(" curSelectedDivId = tabDivId;\n");
+ sb_.append("}\n");
+ }
+
+ public String toString()
+ {
+ return sb_.toString();
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnection.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnection.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnection.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This class accepts a client connection and parses http data from it.
+ */
+
+// TODO: shouldClose_ is not used correctly. It should be used to close the socket? When?
+
+package org.apache.cassandra.net.http;
+
+import java.util.*;
+import java.net.*;
+import java.io.*;
+import java.nio.*;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.concurrent.SingleThreadedStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.SelectionKeyHandler;
+import org.apache.cassandra.net.SelectorManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpConnection extends SelectionKeyHandler implements HttpStartLineParser.Callback, HttpHeaderParser.Callback
+{
+ private static Logger logger_ = Logger.getLogger(StorageService.class);
+ public static final String httpRequestVerbHandler_ = "HTTP-REQUEST-VERB-HANDLER";
+ public static final String httpStage_ = "HTTP-STAGE";
+
+ /*
+ * These are the callbacks into who ever intends
+ * to listen on the client socket.
+ */
+ public interface HttpConnectionListener
+ {
+ public void onRequest(HttpRequest httpRequest);
+ public void onResponse(HttpResponse httpResponse);
+ }
+
+ enum HttpMessageType
+ {
+ UNKNOWN,
+ REQUEST,
+ RESPONSE
+ }
+
+ enum ParseState
+ {
+ IN_NEW,
+ IN_START,
+ IN_HEADERS, IN_BODY
+ }
+
+ private ParseState parseState_ = ParseState.IN_NEW;
+ private long parseStartTime_ = 0;
+ private HttpMessageType currentMsgType_ = HttpMessageType.UNKNOWN;
+ private int contentLength_ = 0;
+ private List<ByteBuffer> bodyBuffers_ = new LinkedList<ByteBuffer>();
+ private boolean shouldClose_ = false;
+ private String defaultContentType_ = "text/html";
+ private HttpRequest currentRequest_ = null;
+ private HttpResponse currentResponse_ = null;
+ private HttpStartLineParser startLineParser_ = new HttpStartLineParser(this);
+ private HttpHeaderParser headerParser_ = new HttpHeaderParser(this);
+ /* Selection Key associated with this HTTP Connection */
+ private SelectionKey httpKey_;
+ /* SocketChannel associated with this HTTP Connection */
+ private SocketChannel httpChannel_;
+ /* HTTPReader instance associated with this HTTP Connection */
+ private HTTPReader httpReader_ = new HTTPReader();
+
+ /*
+ * This abstraction starts reading the data that comes in
+ * on a HTTP request. It accumulates the bytes read into
+ * a buffer and passes the buffer to the HTTP parser.
+ */
+
+ class HTTPReader implements Runnable
+ {
+ /* We read 256 bytes at a time from a HTTP connection */
+ private static final int bufferSize_ = 256;
+
+ /*
+ * Read buffers from the input stream into the byte buffer.
+ */
+ public void run()
+ {
+ ByteBuffer readBuffer = ByteBuffer.allocate(HTTPReader.bufferSize_);
+ try
+ {
+ int bytesRead = httpChannel_.read(readBuffer);
+ readBuffer.flip();
+ if ( readBuffer.remaining() > 0 )
+ HttpConnection.this.parse(readBuffer);
+ }
+ catch ( IOException ex )
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+ }
+
+ public static class HttpRequestMessage
+ {
+ private HttpRequest httpRequest_;
+ private HttpConnection httpConnection_;
+
+ HttpRequestMessage(HttpRequest httpRequest, HttpConnection httpConnection)
+ {
+ httpRequest_ = httpRequest;
+ httpConnection_ = httpConnection;
+ }
+
+ public HttpRequest getHttpRequest()
+ {
+ return httpRequest_;
+ }
+
+ public HttpConnection getHttpConnection()
+ {
+ return httpConnection_;
+ }
+ }
+
+ /*
+ * Read called on the Selector thread. This is called
+ * when there is some HTTP request that needs to be
+ * processed.
+ */
+ public void read(SelectionKey key)
+ {
+ if ( httpKey_ == null )
+ {
+ httpKey_ = key;
+ httpChannel_ = (SocketChannel)key.channel();
+ }
+ /* deregister interest for read */
+ key.interestOps( key.interestOps() & ( ~SelectionKey.OP_READ ) );
+ /* Add a task to process the HTTP request */
+ MessagingService.getReadExecutor().execute(httpReader_);
+ }
+
+ public void modifyKeyForRead(SelectionKey key)
+ {
+ key.interestOps( httpKey_.interestOps() | SelectionKey.OP_READ );
+ }
+
+ private void resetParserState()
+ {
+ startLineParser_.resetParserState();
+ headerParser_.resetParserState();
+ parseState_ = ParseState.IN_NEW;
+ contentLength_ = 0;
+ bodyBuffers_ = new LinkedList<ByteBuffer>();
+ currentMsgType_ = HttpMessageType.UNKNOWN;
+ currentRequest_ = null;
+ currentResponse_ = null;
+ }
+
+ public void close()
+ {
+ logger_.info("Closing HTTP socket ...");
+ if ( httpKey_ != null )
+ SelectorManager.getSelectorManager().cancel(httpKey_);
+ }
+
+ /*
+ * Process the HTTP commands sent from the client. Reads
+ * the socket and parses the HTTP request.
+ */
+ public void parse(ByteBuffer bb)
+ {
+ try
+ {
+ logger_.debug("Processing http requests from socket ...");
+ switch (parseState_)
+ {
+ case IN_NEW:
+ parseState_ = ParseState.IN_START;
+ parseStartTime_ = System.currentTimeMillis();
+
+ // fall through
+ case IN_START:
+ if (startLineParser_.onMoreBytesNew(bb) == false)
+ {
+ break;
+ }
+ else
+ {
+ /* Already done through the callback */
+ parseState_ = ParseState.IN_HEADERS;
+ }
+
+ // fall through
+ case IN_HEADERS:
+ if (headerParser_.onMoreBytesNew(bb) == false)
+ {
+
+ break; // need more bytes
+ }
+ else
+ {
+ String len;
+ if (currentMsgType_ == HttpMessageType.REQUEST)
+ {
+ len = currentRequest_.getHeader(HttpProtocolConstants.CONTENT_LENGTH);
+
+ // find if we should close method
+ if (currentRequest_.getVersion().equalsIgnoreCase("HTTP/1.1"))
+ {
+ /*
+ * Scan all of the headers for close messages
+ */
+ String val = currentRequest_.getHeader(HttpProtocolConstants.CONNECTION);
+
+ if (val != null && val.equalsIgnoreCase(HttpProtocolConstants.CLOSE))
+ {
+ shouldClose_ = true;
+ }
+ } else if (currentRequest_.getVersion().equalsIgnoreCase("HTTP/1.0"))
+ {
+ /* By default no keep-alive */
+ shouldClose_ = true;
+
+ /*
+ * Scan all of the headers for keep-alive
+ * messages
+ */
+ String val = currentRequest_.getHeader(HttpProtocolConstants.CONNECTION);
+
+ if (val != null && val.equalsIgnoreCase(HttpProtocolConstants.KEEP_ALIVE))
+ {
+ shouldClose_ = false;
+ }
+ } else
+ {
+ /* Assume 0.9 */
+ shouldClose_ = true;
+ }
+ }
+ else if (currentMsgType_ == HttpMessageType.RESPONSE)
+ {
+ len = currentResponse_.getHeader(HttpProtocolConstants.CONTENT_LENGTH);
+
+ // TODO: pay attention to keep-alive and
+ // close headers
+ }
+ else
+ {
+ logger_.warn("in HttpConnection::processInput_() Message type is not set");
+ return;
+ }
+
+ if (len != null)
+ {
+ try
+ {
+ if(len == null || len.equals(""))
+ contentLength_ = 0;
+ else
+ contentLength_ = Integer.parseInt(len);
+ }
+ catch (NumberFormatException ex)
+ {
+ throw new HttpParsingException();
+ }
+ }
+ parseState_ = ParseState.IN_BODY;
+ }
+
+ // fall through
+ case IN_BODY:
+ boolean done = false;
+
+ if (contentLength_ > 0)
+ {
+ if (bb.remaining() > contentLength_)
+ {
+ int newLimit = bb.position() + contentLength_;
+ bodyBuffers_.add(((ByteBuffer) bb.duplicate().limit(newLimit)).slice());
+ bb.position(newLimit);
+ contentLength_ = 0;
+ }
+ else
+ {
+ contentLength_ -= bb.remaining();
+ bodyBuffers_.add(bb.duplicate());
+ bb.position(bb.limit());
+ }
+ }
+
+ if (contentLength_ == 0)
+ {
+ done = true;
+ }
+
+ if (done)
+ {
+ if (currentMsgType_ == HttpMessageType.REQUEST)
+ {
+ //currentRequest_.setParseTime(env_.getCurrentTime() - parseStartTime_);
+ currentRequest_.setBody(bodyBuffers_);
+
+ if (currentRequest_.getHeader("Content-Type") == null)
+ {
+ currentRequest_.addHeader("Content-Type", defaultContentType_);
+ }
+
+ handleRequest(currentRequest_);
+ }
+ else if (currentMsgType_ == HttpMessageType.RESPONSE)
+ {
+ logger_.info("Holy shit! We are not supposed to be here - ever !!!");
+ }
+ else
+ {
+ logger_.error("Http message type is still" +
+ " unset after we finish parsing the body?");
+ }
+
+ resetParserState();
+ }
+ }
+
+ }
+ catch (final Throwable e)
+ {
+ logger_.warn(LogUtil.throwableToString(e));
+ //close();
+ }
+ finally
+ {
+ SelectorManager.getSelectorManager().modifyKeyForRead(httpKey_);
+ }
+ }
+
+ public void write(ByteBuffer buffer)
+ {
+ /*
+ * TODO: Make this a non blocking write.
+ */
+ try
+ {
+ while ( buffer.remaining() > 0 )
+ {
+ httpChannel_.write(buffer);
+ }
+ close();
+ }
+ catch ( IOException ex )
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+
+ private void handleRequest(HttpRequest request)
+ {
+ HttpConnection.HttpRequestMessage httpRequestMessage = new HttpConnection.HttpRequestMessage(request, this);
+ Message httpMessage = new Message(null, HttpConnection.httpStage_, HttpConnection.httpRequestVerbHandler_, new Object[]{httpRequestMessage});
+ MessagingService.receive(httpMessage);
+ }
+
+ // HttpStartLineParser.Callback interface implementation
+ public void onStartLine(String method, String path, String query, String version)
+ {
+ logger_.debug("Startline method=" + method + " path=" + path + " query=" + query + " version=" + version);
+
+ if (method.startsWith("HTTP"))
+ {
+ // response
+ currentMsgType_ = HttpMessageType.RESPONSE;
+ currentResponse_ = new HttpResponse();
+ currentResponse_.setStartLine(method, path, version);
+ }
+ else
+ {
+ // request
+ currentMsgType_ = HttpMessageType.REQUEST;
+ currentRequest_ = new HttpRequest();
+ currentRequest_.setStartLine(method, path, query, version);
+ }
+ }
+
+ // HttpHeaderParser.Callback interface implementation
+ public void onHeader(String name, String value)
+ {
+ if (currentMsgType_ == HttpMessageType.REQUEST)
+ {
+ currentRequest_.addHeader(name, value);
+ }
+ else if (currentMsgType_ == HttpMessageType.RESPONSE)
+ {
+ currentResponse_.addHeader(name, value);
+ }
+ else
+ {
+ logger_.warn("Unknown message type -- HttpConnection::onHeader()");
+ }
+
+ logger_.debug(name + " : " + value);
+ }
+}
+
+
+
Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnectionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnectionHandler.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnectionHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpConnectionHandler.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.SelectionKeyHandler;
+import org.apache.cassandra.net.SelectorManager;
+import org.apache.log4j.Logger;
+
+public class HttpConnectionHandler extends SelectionKeyHandler
+{
+ private static Logger logger_ = Logger.getLogger(HttpConnectionHandler.class);
+
+ public void accept(SelectionKey key)
+ {
+ try
+ {
+ ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
+ SocketChannel client = serverChannel.accept();
+ if ( client != null )
+ {
+ client.configureBlocking(false);
+ SelectionKeyHandler handler = new HttpConnection();
+ SelectorManager.getSelectorManager().register(client, handler, SelectionKey.OP_READ);
+ }
+ }
+ catch(IOException e)
+ {
+ logger_.warn(e);
+ }
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpHeaderParser.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpHeaderParser.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpHeaderParser.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpHeaderParser.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.cassandra.net.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ * @author kranganathan
+ */
+/**
+ * A parser for HTTP header lines.
+ *
+ */
+public class HttpHeaderParser
+{
+
+ private Callback callback_;
+
+ public interface Callback
+ {
+
+ public void onHeader(String key, String value);
+ }
+
+ public HttpHeaderParser(Callback cb)
+ {
+ callback_ = cb;
+ }
+
+ enum HeaderParseState
+ {
+ // we are at the very beginning of the line
+ START_OF_HEADER_LINE,
+ // are at line beginning, read '\r' but ran out of bytes in this round
+ START_OF_HEADER_LINE_WITH_READ_SLASH_R,
+ // we are in the process of parsing a header key
+ IN_HEADER_KEY,
+ // eat whitespace after the ':' but before the value
+ PRE_HEADER_VALUE_WHITESPACE,
+ // we are in the process of parsing a header value
+ IN_HEADER_VALUE,
+ // were in IN_HEADER_VALUE and read '\r' but ran out of more bytes
+ IN_HEADER_VALUE_WITH_READ_SLASH_R,
+ /*
+ * got \r\n in the header value. now consider whether its a multilined
+ * value. For example,
+ *
+ * HeaderKey: HeaderValue\r\n this is still part of the value\r\n
+ *
+ * is a valid HTTP header line with value
+ *
+ * HeaderValue\r\n this is still part of the value
+ *
+ * NOTE: while all whitespace should generally be condensed into a
+ * single space by the HTTP standard, we will just preserve all of the
+ * whitespace for now
+ *
+ * TODO: consider replacing all whitespace with a single space
+ *
+ * TODO: this parser doesn't correctly preserve the \r\n, should it?
+ */
+ CHECKING_END_OF_VALUE,
+ // we are just about to reset the state of the header parser
+ TO_RESET
+ }
+
+ // the current state of the parser
+ private HeaderParseState parseState_ = HeaderParseState.TO_RESET;
+ // incrementally build up this HTTP header key as we read it
+ private StringBuilder headerKey_ = new StringBuilder(32);
+
+ // incrementally build up this HTTP header value as we read it
+ private StringBuilder headerValue_ = new StringBuilder(64);
+
+ public void resetParserState()
+ {
+ headerKey_.setLength(0);
+ headerValue_.setLength(0);
+ parseState_ = HeaderParseState.START_OF_HEADER_LINE;
+ }
+
+ private void finishCurrentHeader_()
+ {
+ if (callback_ != null)
+ {
+ callback_.onHeader(headerKey_.toString().trim(), headerValue_
+ .toString().trim());
+ }
+ resetParserState();
+ }
+
+ public boolean onMoreBytes(InputStream in) throws IOException
+ {
+ int got;
+
+ if (parseState_ == HeaderParseState.TO_RESET)
+ {
+ resetParserState();
+ }
+
+ while (in.available() > 0)
+ {
+ in.mark(1);
+ got = in.read();
+
+ switch (parseState_)
+ {
+
+ case START_OF_HEADER_LINE:
+ switch (got)
+ {
+ case '\r':
+ if (in.available() > 0)
+ {
+ in.mark(1);
+ got = in.read();
+
+ if (got == '\n')
+ {
+ parseState_ = HeaderParseState.TO_RESET;
+ return true;
+ } // TODO: determine whether this \r-eating is valid
+ else
+ {
+ in.reset();
+ }
+ } // wait for more data to make this decision
+ else
+ {
+ in.reset();
+ return false;
+ }
+ break;
+
+ default:
+ in.reset();
+ parseState_ = HeaderParseState.IN_HEADER_KEY;
+ break;
+ }
+ break;
+
+ case IN_HEADER_KEY:
+ switch (got)
+ {
+ case ':':
+ parseState_ = HeaderParseState.PRE_HEADER_VALUE_WHITESPACE;
+ break;
+ // TODO: find out: whether to eat whitespace before a :
+ default:
+ headerKey_.append((char) got);
+ break;
+ }
+ break;
+
+ case PRE_HEADER_VALUE_WHITESPACE:
+ switch (got)
+ {
+ case ' ':
+ case '\t':
+ break;
+ default:
+ in.reset();
+ parseState_ = HeaderParseState.IN_HEADER_VALUE;
+ break;
+ }
+ break;
+
+ case IN_HEADER_VALUE:
+ switch (got)
+ {
+ case '\r':
+ if (in.available() > 0)
+ {
+ in.mark(1);
+ got = in.read();
+
+ if (got == '\n')
+ {
+ parseState_ = HeaderParseState.CHECKING_END_OF_VALUE;
+ break;
+ } // TODO: determine whether this \r-eating is valid
+ else
+ {
+ in.reset();
+ }
+ }
+ else
+ {
+ in.reset();
+ return false;
+ }
+ break;
+ default:
+ headerValue_.append((char) got);
+ break;
+ }
+ break;
+
+ case CHECKING_END_OF_VALUE:
+ switch (got)
+ {
+ case ' ':
+ case '\t':
+ in.reset();
+ parseState_ = HeaderParseState.IN_HEADER_VALUE;
+ break;
+ default:
+ in.reset();
+ finishCurrentHeader_();
+ }
+ break;
+ default:
+ assert false;
+ parseState_ = HeaderParseState.START_OF_HEADER_LINE;
+ break;
+ }
+ }
+
+ return false;
+ }
+
+ public boolean onMoreBytesNew(ByteBuffer buffer) throws IOException
+ {
+
+ int got;
+ int limit = buffer.limit();
+ int pos = buffer.position();
+
+ if (parseState_ == HeaderParseState.TO_RESET)
+ {
+ resetParserState();
+ }
+
+ while (pos < limit)
+ {
+ switch (parseState_)
+ {
+
+ case START_OF_HEADER_LINE:
+ if ((got = buffer.get(pos)) != '\r')
+ {
+ parseState_ = HeaderParseState.IN_HEADER_KEY;
+ break;
+ }
+ else
+ {
+ pos++;
+ if (pos == limit) // Need more bytes
+ {
+ buffer.position(pos);
+ parseState_ = HeaderParseState.START_OF_HEADER_LINE_WITH_READ_SLASH_R;
+ return false;
+ }
+ }
+ // fall through
+
+ case START_OF_HEADER_LINE_WITH_READ_SLASH_R:
+ // Processed "...\r\n\r\n" - headers are complete
+ if (((char) buffer.get(pos)) == '\n')
+ {
+ buffer.position(++pos);
+ parseState_ = HeaderParseState.TO_RESET;
+ return true;
+ } // TODO: determine whether this \r-eating is valid
+ else
+ {
+ parseState_ = HeaderParseState.IN_HEADER_KEY;
+ }
+ //fall through
+
+ case IN_HEADER_KEY:
+ // TODO: find out: whether to eat whitespace before a :
+ while (pos < limit && (got = buffer.get(pos)) != ':')
+ {
+ headerKey_.append((char) got);
+ pos++;
+ }
+ if (pos < limit)
+ {
+ pos++; //eating ':'
+ parseState_ = HeaderParseState.PRE_HEADER_VALUE_WHITESPACE;
+ }
+ break;
+
+ case PRE_HEADER_VALUE_WHITESPACE:
+ while ((((got = buffer.get(pos)) == ' ') || (got == '\t'))
+ && (++pos < limit))
+ {
+ ;
+ }
+ if (pos < limit)
+ {
+ parseState_ = HeaderParseState.IN_HEADER_VALUE;
+ }
+ break;
+
+ case IN_HEADER_VALUE:
+ while (pos < limit && (got = buffer.get(pos)) != '\r')
+ {
+ headerValue_.append((char) got);
+ pos++;
+ }
+ if (pos == limit)
+ {
+ break;
+ }
+
+ pos++;
+ if (pos == limit)
+ {
+ parseState_ = HeaderParseState.IN_HEADER_VALUE_WITH_READ_SLASH_R;
+ break;
+ //buffer.position(pos);
+ //return false;
+ }
+ // fall through
+
+ case IN_HEADER_VALUE_WITH_READ_SLASH_R:
+ if (((char) buffer.get(pos)) == '\n')
+ {
+ parseState_ = HeaderParseState.CHECKING_END_OF_VALUE;
+ pos++;
+ } // TODO: determine whether this \r-eating is valid
+ else
+ {
+ parseState_ = HeaderParseState.IN_HEADER_VALUE;
+ }
+ break;
+
+ case CHECKING_END_OF_VALUE:
+ switch ((char) buffer.get(pos))
+ {
+ case ' ':
+ case '\t':
+ parseState_ = HeaderParseState.IN_HEADER_VALUE;
+ break;
+
+ default:
+ // Processed "headerKey headerValue\r\n"
+ finishCurrentHeader_();
+ }
+ break;
+
+ default:
+ assert false;
+ parseState_ = HeaderParseState.START_OF_HEADER_LINE;
+ break;
+ }
+
+ }
+ // Need to read more bytes - get next buffer
+ buffer.position(pos);
+ return false;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpParsingException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpParsingException.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpParsingException.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpParsingException.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.io.IOException;
+
+/**
+ *
+ * @author kranganathan
+ */
+
+public class HttpParsingException extends IOException
+{
+ private static final long serialVersionUID = 1L;
+}
+
+
Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpProtocolConstants.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpProtocolConstants.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpProtocolConstants.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpProtocolConstants.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.cassandra.net.http;
+
+/**
+ *
+ * @author kranganathan
+ */
+public interface HttpProtocolConstants
+{
+ static final String CONNECTION = "Connection";
+ static final String CONTENT_LENGTH = "Content-Length";
+ static final String CLOSE = "close";
+ static final String KEEP_ALIVE = "Keep-Alive";
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpRequest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpRequest.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpRequest.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpRequest.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Encapsulates a HTTP request.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.util.*;
+import java.io.*;
+import java.net.URLDecoder;
+import java.nio.*;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpRequest
+{
+ private Map<String, String> headersMap_ = new HashMap<String, String>();
+ private Map<String, String> paramsMap_ = new HashMap<String, String>();
+ private String sBody_ = "";
+ private String method_;
+ private String path_;
+ private String query_;
+ private String version_;
+
+ /*
+ * Returns the type of method - GET, POST, etc.
+ */
+ public String getMethod()
+ {
+ return method_;
+ }
+
+ /*
+ * Gets the request path referenced by the request.
+ * For example, if the URL is of the form:
+ * http://somedomain:PORT/some/path?param=value
+ * this function will return
+ * "/some/path"
+ */
+ public String getPath()
+ {
+ return path_;
+ }
+
+ /*
+ * Gets the query in the request.
+ * For example, if the URL is of the form:
+ * http://somedomain:PORT/some/path?param=value
+ * this function will return
+ * "/some/path"
+ */
+ public String getQuery()
+ {
+ return query_;
+ }
+
+ /*
+ * Returns the supported HTTP protocol version.
+ */
+ public String getVersion()
+ {
+ return "HTTP/1.1";
+ }
+
+ /*
+ * This function add to the map of header name-values
+ * in the HTTP request.
+ */
+ public void addHeader(String name, String value)
+ {
+ headersMap_.put(name, value);
+ }
+
+ /*
+ * For a gives name, returns the value if it was in the
+ * headers. Returns the empty string otherwise.
+ */
+ public String getHeader(String name)
+ {
+ if(headersMap_.get(name) == null)
+ return "";
+ return headersMap_.get(name).toString();
+ }
+
+ public void setParameter(String name, String value)
+ {
+ // first decode the data then store it in the map using the standard UTF-8 encoding
+ String decodedValue = value;
+ try
+ {
+ decodedValue = URLDecoder.decode(value, "UTF-8");
+ }
+ catch (Exception e)
+ {
+ // do nothing
+ }
+ paramsMap_.put(name, decodedValue);
+ }
+
+ /*
+ * This function get the parameters from the body of the HTTP message.
+ * Returns the value for the parameter if one exists or returns null.
+ *
+z * For example, if the body is of the form:
+ * a=b&c=d
+ * this function will:
+ * return "b" when called as getParameter("a")
+ */
+ public String getParameter(String name)
+ {
+ if(paramsMap_.get(name) != null)
+ return paramsMap_.get(name).toString();
+ return null;
+ }
+
+ /*
+ * Get the string representation of the byte buffers passed in and put
+ * them in sBody_ variable. Then parse all the parameters in the body.
+ */
+ public void setBody(List<ByteBuffer> bodyBuffers)
+ {
+ if(bodyBuffers == null)
+ return;
+ try
+ {
+ // get the byte buffers that the body should be composed of
+ // and collect them in the string builder
+ StringBuilder sb = new StringBuilder();
+ for(int i = 0; i < bodyBuffers.size(); ++i)
+ {
+ ByteBuffer bb = bodyBuffers.get(i);
+ if(bb.remaining() <= 0)
+ {
+ continue;
+ }
+ byte[] byteStr = new byte[bb.remaining()];
+ bb.get(byteStr);
+ String s = new String(byteStr);
+ sb.append(s);
+ }
+
+ // add the string to the body
+ if(sb.toString() != null)
+ {
+ sBody_ += sb.toString();
+ }
+
+ // once we are done with the body, parse the parameters
+ String[] sParamValues = sBody_.split("&");
+ for(int i = 0; i < sParamValues.length; ++i)
+ {
+ String[] paramVal = sParamValues[i].split("=");
+ if ( paramVal[0] != null && paramVal[1] != null )
+ {
+ setParameter(paramVal[0], paramVal[1]);
+ }
+ }
+ }
+ catch(Exception e)
+ {
+ }
+ }
+
+ public String getBody()
+ {
+ return sBody_;
+ }
+
+ public void setStartLine(String method, String path, String query, String version)
+ {
+ method_ = method;
+ path_ = path;
+ query_ = query;
+ version_ = version;
+ }
+
+ public String toString()
+ {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+
+ pw.println("HttpRequest-------->");
+ pw.println("method = " + method_ + ", path = " + path_ + ", query = " + query_ + ", version = " + version_);
+ pw.println("Headers: " + headersMap_.toString());
+ pw.println("Body: " + sBody_);
+ pw.println("<--------HttpRequest");
+
+ return sw.toString();
+ }
+}
+
Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpResponse.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpResponse.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpResponse.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.util.*;
+import java.io.*;
+import java.nio.*;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpResponse
+{
+ private Map<String, String> headersMap_ = new HashMap<String, String>();;
+ private String sBody_ = null;
+ private String method_ = null;
+ private String path_ = null;
+ private String version_ = null;
+
+
+ public String getMethod()
+ {
+ return method_;
+ }
+
+ public String getPath()
+ {
+ return path_;
+ }
+
+ public String getVersion()
+ {
+ return "HTTP/1.1";
+ }
+
+ public void addHeader(String name, String value)
+ {
+ headersMap_.put(name, value);
+ }
+
+ public String getHeader(String name)
+ {
+ return headersMap_.get(name).toString();
+ }
+
+ public void setBody(List<ByteBuffer> bodyBuffers)
+ {
+ StringBuffer sb = new StringBuffer();
+ while(bodyBuffers.size() > 0)
+ {
+ sb.append(bodyBuffers.remove(0).asCharBuffer().toString());
+ }
+ sBody_ = sb.toString();
+ }
+
+ public String getBody()
+ {
+ return sBody_;
+ }
+
+ public void setStartLine(String method, String path, String version)
+ {
+ method_ = method;
+ path_ = path;
+ version_ = version;
+ }
+
+ public String toString()
+ {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+
+ pw.println("HttpResponse-------->");
+ pw.println("method = " + method_ + ", path = " + path_ + ", version = " + version_);
+ pw.println("Headers: " + headersMap_.toString());
+ pw.println("Body: " + sBody_);
+ pw.println("<--------HttpResponse");
+
+ return sw.toString();
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpStartLineParser.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpStartLineParser.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpStartLineParser.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpStartLineParser.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpStartLineParser
+{
+ private Callback callback_;
+
+ public interface Callback
+ {
+ void onStartLine(String method, String path, String query, String version);
+ };
+
+ public HttpStartLineParser(Callback cb)
+ {
+ callback_ = cb;
+ }
+
+ private enum StartLineParseState
+ {
+ EATING_WHITESPACE,
+ READING_METHOD,
+ READING_PATH,
+ READING_QUERY,
+ DECODING_FIRST_CHAR,
+ DECODING_SECOND_CHAR,
+ READING_VERSION,
+ CHECKING_EOL,
+ TO_RESET
+ }
+
+ private StartLineParseState parseState_ = StartLineParseState.TO_RESET;
+ private StartLineParseState nextState_;
+ private StringBuilder httpMethod_ = new StringBuilder(32);
+ private StringBuilder httpPath_ = new StringBuilder();
+ private StringBuilder httpQuery_ = new StringBuilder(32);
+ private StringBuilder httpVersion_ = new StringBuilder();
+
+ // we will encode things of the form %{2 digit hex number} and this is a
+ // temporary holder for the leftmost digit's value as the second digit is
+ // being read
+ private int encodedValue_;
+ // this is a pointer to one of httpMethod_, httpPath_, httpQuery_,
+ // httpVersion_ so that the encoded value can be appended to the correct
+ // buffer
+ private StringBuilder encodeTo_;
+
+ public void resetParserState() {
+ httpMethod_.setLength(0);
+ httpPath_.setLength(0);
+ httpQuery_.setLength(0);
+ httpVersion_.setLength(0);
+
+ parseState_ = StartLineParseState.EATING_WHITESPACE;
+ nextState_ = StartLineParseState.READING_METHOD;
+ }
+
+ private void finishLine_()
+ {
+ if (callback_ != null)
+ {
+ callback_.onStartLine(
+ httpMethod_.toString(),
+ httpPath_.toString(),
+ httpQuery_.toString(),
+ httpVersion_.toString()
+ );
+ }
+ }
+
+ private static int decodeHex(int hex)
+ {
+ if (hex >= '0' && hex <= '9')
+ {
+ return hex-'0';
+ }
+ else if (hex >= 'a' && hex <= 'f')
+ {
+ return hex-'a'+10;
+ }
+ else if (hex >= 'A' && hex <= 'F')
+ {
+ return hex-'A'+10;
+ }
+ else
+ {
+ return 0;
+ }
+ }
+
+ public boolean onMoreBytes(InputStream in) throws HttpParsingException, IOException
+ {
+ int got;
+
+ if (parseState_ == StartLineParseState.TO_RESET)
+ {
+ resetParserState();
+ }
+
+ while (in.available() > 0)
+ {
+ in.mark(1);
+ got = in.read();
+
+ switch (parseState_)
+ {
+ case EATING_WHITESPACE:
+ switch (got)
+ {
+ case ' ':
+ break;
+ default:
+ in.reset();
+ parseState_ = nextState_;
+ break;
+ }
+ break;
+
+ case READING_METHOD:
+ switch (got)
+ {
+ case ' ':
+ parseState_ = StartLineParseState.EATING_WHITESPACE;
+ nextState_ = StartLineParseState.READING_PATH;
+ break;
+ default:
+ httpMethod_.append((char) got);
+ break;
+ }
+ break;
+
+ case READING_PATH:
+ switch (got)
+ {
+ case '\r':
+ parseState_ = StartLineParseState.CHECKING_EOL;
+ break;
+ case '%':
+ encodeTo_ = httpPath_;
+ nextState_ = parseState_;
+ parseState_ = StartLineParseState.DECODING_FIRST_CHAR;
+ break;
+ case ' ':
+ parseState_ = StartLineParseState.EATING_WHITESPACE;
+ nextState_ = StartLineParseState.READING_VERSION;
+ break;
+ case '?':
+ parseState_ = StartLineParseState.READING_QUERY;
+ break;
+ default:
+ httpPath_.append((char) got);
+ break;
+ }
+ break;
+
+ case READING_QUERY:
+ switch (got)
+ {
+ case '\r':
+ parseState_ = StartLineParseState.CHECKING_EOL;
+ break;
+ case '%':
+ encodeTo_ = httpQuery_;
+ nextState_ = parseState_;
+ parseState_ = StartLineParseState.DECODING_FIRST_CHAR;
+ break;
+ case ' ':
+ parseState_ = StartLineParseState.EATING_WHITESPACE;
+ nextState_ = StartLineParseState.READING_VERSION;
+ break;
+ case '+':
+ httpQuery_.append(' ');
+ break;
+ default:
+ httpQuery_.append((char) got);
+ break;
+ }
+ break;
+
+ case DECODING_FIRST_CHAR:
+ encodedValue_ = decodeHex(got) * 16;
+ parseState_ = StartLineParseState.DECODING_SECOND_CHAR;
+ break;
+
+ case DECODING_SECOND_CHAR:
+ encodeTo_.append((char) (decodeHex(got) + encodedValue_));
+ parseState_ = nextState_;
+ break;
+
+ case READING_VERSION:
+ switch (got)
+ {
+ case '\r':
+ parseState_ = StartLineParseState.CHECKING_EOL;
+ break;
+ default:
+ httpVersion_.append((char) got);
+ break;
+ }
+ break;
+
+ case CHECKING_EOL:
+ switch (got)
+ {
+ case '\n':
+ finishLine_();
+ parseState_ = StartLineParseState.TO_RESET;
+ return true;
+ default:
+ throw new HttpParsingException();
+ }
+
+ default:
+ throw new HttpParsingException();
+ }
+ }
+
+ return false;
+ }
+
+ public boolean onMoreBytesNew(ByteBuffer buffer) throws HttpParsingException, IOException
+ {
+ int got;
+ int limit = buffer.limit();
+ int pos = buffer.position();
+
+ if (parseState_ == StartLineParseState.TO_RESET)
+ {
+ resetParserState();
+ }
+
+ while(pos < limit)
+ {
+ switch(parseState_)
+ {
+ case EATING_WHITESPACE:
+ while((char)buffer.get(pos) == ' ' && ++pos < limit);
+ if(pos < limit)
+ parseState_ = nextState_;
+ break;
+
+ case READING_METHOD:
+ while(pos < limit && (got = buffer.get(pos)) != ' ')
+ {
+ httpMethod_.append((char)got);
+ pos++;
+ }
+
+ if(pos < limit)
+ {
+ parseState_ = StartLineParseState.EATING_WHITESPACE;
+ nextState_ = StartLineParseState.READING_PATH;
+ }
+ break;
+
+ case READING_PATH:
+ while(pos < limit && parseState_ == StartLineParseState.READING_PATH)
+ {
+ got = buffer.get(pos++);
+
+ switch (got)
+ {
+ case '\r':
+ parseState_ = StartLineParseState.CHECKING_EOL;
+ break;
+ case '%':
+ encodeTo_ = httpPath_;
+ nextState_ = parseState_;
+ parseState_ = StartLineParseState.DECODING_FIRST_CHAR;
+ break;
+ case ' ':
+ parseState_ = StartLineParseState.EATING_WHITESPACE;
+ nextState_ = StartLineParseState.READING_VERSION;
+ break;
+ case '?':
+ parseState_ = StartLineParseState.READING_QUERY;
+ break;
+ default:
+ httpPath_.append((char) got);
+ break;
+ }
+ }
+ break;
+
+ case READING_QUERY:
+ while(pos < limit && parseState_ == StartLineParseState.READING_QUERY)
+ {
+ got = buffer.get(pos++);
+
+ switch (got)
+ {
+ case '\r':
+ parseState_ = StartLineParseState.CHECKING_EOL;
+ break;
+ case '%':
+ encodeTo_ = httpQuery_;
+ nextState_ = parseState_;
+ parseState_ = StartLineParseState.DECODING_FIRST_CHAR;
+ break;
+ case ' ':
+ parseState_ = StartLineParseState.EATING_WHITESPACE;
+ nextState_ = StartLineParseState.READING_VERSION;
+ break;
+ case '+':
+ httpQuery_.append(' ');
+ break;
+ default:
+ httpQuery_.append((char) got);
+ break;
+ }
+ }
+ break;
+
+ case DECODING_FIRST_CHAR:
+ got = (int)buffer.get(pos++);
+ encodedValue_ = decodeHex(got) * 16;
+ parseState_ = StartLineParseState.DECODING_SECOND_CHAR;
+ break;
+
+ case DECODING_SECOND_CHAR:
+ got = (int)buffer.get(pos++);
+ encodeTo_.append((char) (decodeHex(got) + encodedValue_));
+ parseState_ = nextState_;
+ break;
+
+ case READING_VERSION:
+ while(pos < limit && (got = buffer.get(pos)) != '\r' )
+ {
+ httpVersion_.append((char)got);
+ pos++;
+ }
+ if(pos < limit)
+ {
+ parseState_ = StartLineParseState.CHECKING_EOL;
+ pos++; // skipping '\r'
+ }
+ break;
+
+ case CHECKING_EOL:
+ switch (buffer.get(pos++))
+ {
+ case '\n':
+ finishLine_();
+ parseState_ = StartLineParseState.TO_RESET;
+ buffer.position(pos);
+ return true; //could have reached limit here
+ default:
+ throw new HttpParsingException();
+ }
+
+ default:
+ throw new HttpParsingException();
+ }
+ }
+
+ buffer.position(pos);
+ return false;
+ }
+}
+
Added: incubator/cassandra/src/org/apache/cassandra/net/http/HttpWriteResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/HttpWriteResponse.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/HttpWriteResponse.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/HttpWriteResponse.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This class writes the HTTP 1.1 responses back to the client.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.io.*;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpWriteResponse
+{
+ private HttpRequest httpRequest_ = null;
+ private StringBuilder body_ = new StringBuilder();
+
+ public HttpWriteResponse(HttpRequest httpRequest)
+ {
+ httpRequest_ = httpRequest;
+ }
+
+ public void println(String responseLine)
+ {
+ if(responseLine != null)
+ {
+ body_.append(responseLine);
+ body_.append( System.getProperty("line.separator"));
+ }
+ }
+
+ public ByteBuffer flush() throws Exception
+ {
+ StringBuilder sb = new StringBuilder();
+ // write out the HTTP response headers first
+ sb.append(httpRequest_.getVersion() + " 200 OK\r\n");
+ sb.append("Content-Type: text/html\r\n");
+ if(body_.length() > 0)
+ sb.append("Content-Length: " + body_.length() + "\r\n");
+ sb.append("Cache-Control: no-cache\r\n");
+ sb.append("Pragma: no-cache\r\n");
+
+ // terminate the headers
+ sb.append("\r\n");
+
+ // now write out the HTTP response body
+ if(body_.length() > 0)
+ sb.append(body_.toString());
+
+ // terminate the body
+ //sb.append("\r\n");
+ //sb.append("\r\n");
+ ByteBuffer buffer = ByteBuffer.wrap(sb.toString().getBytes());
+ return buffer;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/io/ContentLengthState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/ContentLengthState.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/ContentLengthState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/ContentLengthState.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+
+import org.apache.cassandra.utils.FBUtilities;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ContentLengthState extends StartState
+{
+ private ByteBuffer buffer_;
+
+ ContentLengthState(TcpReader stream)
+ {
+ super(stream);
+ buffer_ = ByteBuffer.allocate(4);
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ return doRead(buffer_);
+ }
+
+ public void morphState() throws IOException
+ {
+ int size = FBUtilities.byteArrayToInt(buffer_.array());
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT);
+ if ( nextState == null )
+ {
+ nextState = new ContentState(stream_, size);
+ stream_.putSocketState( TcpReader.TcpReaderState.CONTENT, nextState );
+ }
+ else
+ {
+ nextState.setContextData(size);
+ }
+ stream_.morphState( nextState );
+ buffer_.clear();
+ }
+
+ public void setContextData(Object data)
+ {
+ throw new UnsupportedOperationException("This method is not supported in the ContentLengthState");
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/io/ContentState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/ContentState.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/ContentState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/ContentState.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ContentState extends StartState
+{
+ private ByteBuffer buffer_;
+ private int length_;
+
+ ContentState(TcpReader stream, int length)
+ {
+ super(stream);
+ length_ = length;
+ buffer_ = ByteBuffer.allocate(length_);
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ return doRead(buffer_);
+ }
+
+ public void morphState() throws IOException
+ {
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.DONE);
+ if ( nextState == null )
+ {
+ nextState = new DoneState(stream_, toBytes());
+ stream_.putSocketState( TcpReader.TcpReaderState.DONE, nextState );
+ }
+ else
+ {
+ nextState.setContextData(toBytes());
+ }
+ stream_.morphState( nextState );
+ }
+
+ private byte[] toBytes()
+ {
+ buffer_.position(0);
+ /*
+ ByteBuffer slice = buffer_.slice();
+ return slice.array();
+ */
+ byte[] bytes = new byte[length_];
+ buffer_.get(bytes, 0, length_);
+ return bytes;
+ }
+
+ public void setContextData(Object data)
+ {
+ Integer value = (Integer)data;
+ length_ = value;
+ buffer_.clear();
+ if ( buffer_.capacity() < length_ )
+ buffer_ = ByteBuffer.allocate(length_);
+ else
+ {
+ buffer_.limit(length_);
+ }
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/io/ContentStreamState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/ContentStreamState.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/ContentStreamState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/ContentStreamState.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
+import java.io.*;
+
+import org.apache.cassandra.db.Table;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+
+class ContentStreamState extends StartState
+{
+ private static Logger logger_ = Logger.getLogger(ContentStreamState.class);
+ private static long count_ = 64*1024*1024;
+ /* Return this byte array to exit event loop */
+ private static byte[] bytes_ = new byte[1];
+ private long bytesRead_ = 0L;
+ private FileChannel fc_;
+ private StreamContextManager.StreamContext streamContext_;
+ private StreamContextManager.StreamStatus streamStatus_;
+
+ ContentStreamState(TcpReader stream)
+ {
+ super(stream);
+ SocketChannel socketChannel = stream.getStream();
+ InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+ String remoteHost = remoteAddress.getHostName();
+ streamContext_ = StreamContextManager.getStreamContext(remoteHost);
+ streamStatus_ = StreamContextManager.getStreamStatus(remoteHost);
+ }
+
+ private void createFileChannel() throws IOException
+ {
+ if ( fc_ == null )
+ {
+ logger_.debug("Creating file for " + streamContext_.getTargetFile());
+ FileOutputStream fos = new FileOutputStream( streamContext_.getTargetFile(), true );
+ fc_ = fos.getChannel();
+ }
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ SocketChannel socketChannel = stream_.getStream();
+ InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+ String remoteHost = remoteAddress.getHostName();
+ createFileChannel();
+ if ( streamContext_ != null )
+ {
+ try
+ {
+ bytesRead_ += fc_.transferFrom(socketChannel, bytesRead_, ContentStreamState.count_);
+ if ( bytesRead_ != streamContext_.getExpectedBytes() )
+ throw new ReadNotCompleteException("Specified number of bytes have not been read from the Socket Channel");
+ }
+ catch ( IOException ex )
+ {
+ /* Ask the source node to re-stream this file. */
+ streamStatus_.setAction(StreamContextManager.StreamCompletionAction.STREAM);
+ handleStreamCompletion(remoteHost);
+ /* Delete the orphaned file. */
+ File file = new File(streamContext_.getTargetFile());
+ file.delete();
+ throw ex;
+ }
+ if ( bytesRead_ == streamContext_.getExpectedBytes() )
+ {
+ logger_.debug("Removing stream context " + streamContext_);
+ handleStreamCompletion(remoteHost);
+ bytesRead_ = 0L;
+ fc_.close();
+ morphState();
+ }
+ }
+
+ return new byte[0];
+ }
+
+ private void handleStreamCompletion(String remoteHost) throws IOException
+ {
+ /*
+ * Streaming is complete. If all the data that has to be received inform the sender via
+ * the stream completion callback so that the source may perform the requisite cleanup.
+ */
+ IStreamComplete streamComplete = StreamContextManager.getStreamCompletionHandler(remoteHost);
+ if ( streamComplete != null )
+ {
+ streamComplete.onStreamCompletion(remoteHost, streamContext_, streamStatus_);
+ }
+ }
+
+ public void morphState() throws IOException
+ {
+ /* We instantiate an array of size 1 so that we can exit the event loop of the read. */
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.DONE);
+ if ( nextState == null )
+ {
+ nextState = new DoneState(stream_, ContentStreamState.bytes_);
+ stream_.putSocketState( TcpReader.TcpReaderState.DONE, nextState );
+ }
+ else
+ {
+ nextState.setContextData(ContentStreamState.bytes_);
+ }
+ stream_.morphState( nextState );
+ }
+
+ public void setContextData(Object data)
+ {
+ throw new UnsupportedOperationException("This method is not supported in the ContentStreamState");
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/net/io/DoneState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/DoneState.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/DoneState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/DoneState.java Mon Mar 2 06:13:14 2009
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class DoneState extends StartState
+{
+ private byte[] bytes_ = new byte[0];
+
+ DoneState(TcpReader stream, byte[] bytes)
+ {
+ super(stream);
+ bytes_ = bytes;
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ morphState();
+ return bytes_;
+ }
+
+ public void morphState() throws IOException
+ {
+ stream_.morphState(null);
+ }
+
+ public void setContextData(Object data)
+ {
+ bytes_ = (byte[])data;
+ }
+}