You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC

svn commit: r749218 [25/34] - in /incubator/cassandra: branches/ dist/ nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/ trunk/src/org/apache/ trunk/src/org/apache/cassandra/ trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpRequest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpRequest.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpRequest.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpRequest.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Encapsulates a HTTP request.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.util.*;
+import java.io.*;
+import java.net.URLDecoder;
+import java.nio.*;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpRequest
+{
+    private Map<String, String> headersMap_ = new HashMap<String, String>();
+    private Map<String, String> paramsMap_ = new HashMap<String, String>();
+    private String sBody_ = "";
+    private String method_;
+    private String path_;
+    private String query_;
+    private String version_;
+
+    /*
+     * Returns the type of method - GET, POST, etc.
+    */
+    public String getMethod()
+    {
+        return method_;
+    }
+
+    /*
+     * Gets the request path referenced by the request.
+     * For example, if the URL is of the form:
+     *  http://somedomain:PORT/some/path?param=value
+     * this function will return
+     *  "/some/path"
+     */
+    public String getPath()
+    {
+        return path_;
+    }
+
+    /*
+     * Gets the query in the request.
+     * For example, if the URL is of the form:
+     *  http://somedomain:PORT/some/path?param=value
+     * this function will return
+     *  "/some/path"
+     */
+    public String getQuery()
+    {
+        return query_;
+    }
+
+    /*
+     * Returns the supported HTTP protocol version.
+     */
+    public String getVersion()
+    {
+        return "HTTP/1.1";
+    }
+
+    /*
+     * This function add to the map of header name-values
+     * in the HTTP request.
+     */
+    public void addHeader(String name, String value)
+    {
+        headersMap_.put(name, value);
+    }
+
+    /*
+     * For a gives name, returns the value if it was in the
+     * headers. Returns the empty string otherwise.
+     */
+    public String getHeader(String name)
+    {
+        if(headersMap_.get(name) == null)
+            return "";
+        return headersMap_.get(name).toString();
+    }
+
+    public void setParameter(String name, String value)
+    {
+    	// first decode the data then store it in the map using the standard UTF-8 encoding
+    	String decodedValue = value;
+    	try
+    	{
+    		decodedValue = URLDecoder.decode(value, "UTF-8");
+    	}
+    	catch (Exception e)
+    	{
+    		// do nothing
+    	}
+        paramsMap_.put(name, decodedValue);
+    }
+
+    /*
+     * This function get the parameters from the body of the HTTP message.
+     * Returns the value for the parameter if one exists or returns null.
+     *
+z    * For example, if the body is of the form:
+     *  a=b&c=d
+     * this function will:
+     * return "b" when called as getParameter("a")
+     */
+    public String getParameter(String name)
+    {
+        if(paramsMap_.get(name) != null)
+            return paramsMap_.get(name).toString();
+        return null;
+    }
+
+    /*
+     * Get the string representation of the byte buffers passed in and put
+     * them in sBody_ variable. Then parse all the parameters in the body.
+     */
+    public void setBody(List<ByteBuffer> bodyBuffers)
+    {
+        if(bodyBuffers == null)
+            return;
+        try
+        {
+            // get the byte buffers that the body should be composed of
+            // and collect them in the string builder
+            StringBuilder sb = new StringBuilder();
+            for(int i = 0; i < bodyBuffers.size(); ++i)
+            {
+                ByteBuffer bb = bodyBuffers.get(i);
+                if(bb.remaining() <= 0)
+                {
+                    continue;
+                }
+                byte[] byteStr = new byte[bb.remaining()];
+                bb.get(byteStr);
+                String s = new String(byteStr);
+                sb.append(s);
+            }
+
+            // add the string to the body
+            if(sb.toString() != null)
+            {
+                sBody_ += sb.toString();
+            }
+
+            // once we are done with the body, parse the parameters
+            String[] sParamValues = sBody_.split("&");
+            for(int i = 0; i < sParamValues.length; ++i)
+            {
+                String[] paramVal = sParamValues[i].split("=");
+                if ( paramVal[0] != null && paramVal[1] != null )
+                {
+                    setParameter(paramVal[0], paramVal[1]);
+                }
+            }
+        }
+        catch(Exception e)
+        {
+        }
+    }
+
+    public String getBody()
+    {
+        return sBody_;
+    }
+
+    public void setStartLine(String method, String path, String query, String version)
+    {
+        method_ = method;
+        path_ = path;
+        query_ = query;
+        version_ = version;
+    }
+
+    public String toString()
+    {
+        StringWriter sw = new StringWriter();
+        PrintWriter pw = new PrintWriter(sw);
+
+        pw.println("HttpRequest-------->");
+        pw.println("method = " + method_ + ", path = " + path_ + ", query = " + query_ + ", version = " + version_);
+        pw.println("Headers: " + headersMap_.toString());
+        pw.println("Body: " + sBody_);
+        pw.println("<--------HttpRequest");
+
+        return sw.toString();
+    }
+}
+

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpResponse.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpResponse.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpResponse.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.util.*;
+import java.io.*;
+import java.nio.*;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpResponse
+{
+    private Map<String, String> headersMap_ = new HashMap<String, String>();;
+    private String sBody_ = null;
+    private String method_ = null;
+    private String path_ = null;
+    private String version_ = null;
+
+
+    public String getMethod()
+    {
+        return method_;
+    }
+    
+    public String getPath()
+    {
+        return path_;
+    }
+    
+    public String getVersion()
+    {
+        return "HTTP/1.1";
+    }
+
+    public void addHeader(String name, String value)
+    {
+        headersMap_.put(name, value);
+    }
+
+    public String getHeader(String name)
+    {
+        return headersMap_.get(name).toString();
+    }
+
+    public void setBody(List<ByteBuffer> bodyBuffers)
+    {
+        StringBuffer sb = new StringBuffer();
+        while(bodyBuffers.size() > 0)
+        {
+            sb.append(bodyBuffers.remove(0).asCharBuffer().toString());
+        }
+        sBody_ = sb.toString();
+    }
+    
+    public String getBody()
+    {
+        return sBody_;
+    }
+    
+    public void setStartLine(String method, String path, String version)
+    {
+        method_ = method;
+        path_ = path;
+        version_ = version;
+    }    
+
+    public String toString()
+    {
+        StringWriter sw = new StringWriter();
+        PrintWriter pw = new PrintWriter(sw);
+        
+        pw.println("HttpResponse-------->");
+        pw.println("method = " + method_ + ", path = " + path_ + ", version = " + version_);
+        pw.println("Headers: " + headersMap_.toString());
+        pw.println("Body: " + sBody_);
+        pw.println("<--------HttpResponse");
+        
+        return sw.toString();
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpStartLineParser.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpStartLineParser.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpStartLineParser.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpStartLineParser.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpStartLineParser
+{
+    private Callback callback_;
+
+    public interface Callback
+    {
+        void onStartLine(String method, String path, String query, String version);
+    };
+    
+    public HttpStartLineParser(Callback cb)
+    {
+        callback_ = cb;
+    }
+    
+    private enum StartLineParseState
+    {
+        EATING_WHITESPACE,
+        READING_METHOD,
+        READING_PATH,
+        READING_QUERY,
+        DECODING_FIRST_CHAR,
+        DECODING_SECOND_CHAR,
+        READING_VERSION,
+        CHECKING_EOL,
+        TO_RESET
+    }
+    
+    private StartLineParseState parseState_ = StartLineParseState.TO_RESET;
+    private StartLineParseState nextState_;
+    private StringBuilder httpMethod_ = new StringBuilder(32);
+    private StringBuilder httpPath_ = new StringBuilder();
+    private StringBuilder httpQuery_ = new StringBuilder(32);
+    private StringBuilder httpVersion_ = new StringBuilder();
+    
+    // we will encode things of the form %{2 digit hex number} and this is a 
+    // temporary holder for the leftmost digit's value as the second digit is
+    // being read
+    private int encodedValue_;
+    // this is a pointer to one of httpMethod_, httpPath_, httpQuery_, 
+    // httpVersion_ so that the encoded value can be appended to the correct 
+    // buffer
+    private StringBuilder encodeTo_;
+
+    public void resetParserState() {
+        httpMethod_.setLength(0);
+        httpPath_.setLength(0);
+        httpQuery_.setLength(0);
+        httpVersion_.setLength(0);
+        
+        parseState_ = StartLineParseState.EATING_WHITESPACE;
+        nextState_ = StartLineParseState.READING_METHOD;
+    }
+    
+    private void finishLine_()
+    {
+        if (callback_ != null) 
+        {
+            callback_.onStartLine(
+                    httpMethod_.toString(), 
+                    httpPath_.toString(), 
+                    httpQuery_.toString(), 
+                    httpVersion_.toString()
+                    );
+        }
+    }
+    
+    private static int decodeHex(int hex)
+    {
+        if (hex >= '0' && hex <= '9')
+        {
+            return hex-'0';
+        }
+        else if (hex >= 'a' && hex <= 'f')
+        {
+            return hex-'a'+10;
+        }
+        else if (hex >= 'A' && hex <= 'F')
+        {
+            return hex-'A'+10;
+        }
+        else
+        {
+            return 0;
+        }
+    }
+    
+    public boolean onMoreBytes(InputStream in) throws HttpParsingException, IOException
+    {
+        int got;
+
+        if (parseState_ == StartLineParseState.TO_RESET)
+        {
+            resetParserState();
+        }
+
+        while (in.available() > 0)
+        {
+            in.mark(1);
+            got = in.read();
+			
+            switch (parseState_)
+            {
+                case EATING_WHITESPACE:
+                        switch (got)
+                        {
+                            case ' ':
+                                        break;
+                            default:
+                                        in.reset();
+                                        parseState_ = nextState_;
+                                        break;
+                        }
+                        break;
+                    
+                case READING_METHOD:
+                        switch (got)
+                        {
+                            case ' ':
+                                    parseState_ = StartLineParseState.EATING_WHITESPACE;
+                                    nextState_ = StartLineParseState.READING_PATH;
+                                    break;
+                            default:
+                                    httpMethod_.append((char) got);
+                                    break;
+                        }
+                        break;
+                        
+                case READING_PATH:
+                        switch (got)
+                        {
+                            case '\r':
+                                    parseState_ = StartLineParseState.CHECKING_EOL;
+                                    break;
+                            case '%':
+                                    encodeTo_ = httpPath_;
+                                    nextState_ = parseState_;
+                                    parseState_ = StartLineParseState.DECODING_FIRST_CHAR;
+                                    break;
+                            case ' ':
+                                    parseState_ = StartLineParseState.EATING_WHITESPACE;
+                                    nextState_ = StartLineParseState.READING_VERSION;
+                                    break;
+                            case '?':
+                                    parseState_ = StartLineParseState.READING_QUERY;
+                                    break;
+                            default:
+                                    httpPath_.append((char) got);
+                                    break;
+                        }
+                        break;
+                            
+                case READING_QUERY:
+                        switch (got)
+                        {
+                            case '\r':
+                                    parseState_ = StartLineParseState.CHECKING_EOL;
+                                    break;
+                            case '%':
+                                    encodeTo_ = httpQuery_;
+                                    nextState_ = parseState_;
+                                    parseState_ = StartLineParseState.DECODING_FIRST_CHAR;
+                                    break;
+                            case ' ':
+                                    parseState_ = StartLineParseState.EATING_WHITESPACE;
+                                    nextState_ = StartLineParseState.READING_VERSION;
+                                    break;
+                            case '+':
+                                    httpQuery_.append(' ');
+                                    break;
+                            default:
+                                    httpQuery_.append((char) got);
+                                    break;
+                        }
+                        break;
+                            
+                case DECODING_FIRST_CHAR:
+                        encodedValue_ = decodeHex(got) * 16;
+                        parseState_ = StartLineParseState.DECODING_SECOND_CHAR;
+                        break;
+
+                case DECODING_SECOND_CHAR:
+                        encodeTo_.append((char) (decodeHex(got) + encodedValue_));
+                        parseState_ = nextState_;
+                        break;
+                                
+                case READING_VERSION:
+                        switch (got)
+                        {
+                            case '\r':
+                                    parseState_ = StartLineParseState.CHECKING_EOL;
+                                    break;
+                            default:
+                                    httpVersion_.append((char) got);
+                                    break;
+                        }
+                        break;
+                        
+                case CHECKING_EOL:
+                        switch (got)
+                        {
+                            case '\n':
+                                    finishLine_();
+                                    parseState_ = StartLineParseState.TO_RESET;
+                                    return true;
+                            default:
+                                    throw new HttpParsingException();
+                        }
+                        
+                default:
+                        throw new HttpParsingException();
+            }
+        }
+        
+        return false;
+    }
+    
+    public boolean onMoreBytesNew(ByteBuffer buffer) throws HttpParsingException, IOException
+    {
+        int got;
+        int limit = buffer.limit();
+        int pos = buffer.position();
+    	
+        if (parseState_ == StartLineParseState.TO_RESET)
+        {
+            resetParserState();
+        }
+
+        while(pos < limit) 
+        {
+            switch(parseState_)
+            {	
+                case EATING_WHITESPACE:
+                        while((char)buffer.get(pos) == ' ' && ++pos < limit);
+                        if(pos < limit)
+                            parseState_ = nextState_;
+                        break;
+
+                case READING_METHOD:
+                        while(pos < limit && (got = buffer.get(pos)) != ' ') 
+                        {
+                            httpMethod_.append((char)got);
+                            pos++;
+                        }
+
+                        if(pos < limit)
+                        {
+                            parseState_ = StartLineParseState.EATING_WHITESPACE;
+                            nextState_ = StartLineParseState.READING_PATH;
+                        }
+                        break;
+
+                case READING_PATH:
+                        while(pos < limit && parseState_ == StartLineParseState.READING_PATH) 
+                        {
+                            got = buffer.get(pos++);
+
+                            switch (got)
+                            {
+                                case '\r':
+                                        parseState_ = StartLineParseState.CHECKING_EOL;
+                                        break;
+                                case '%':
+                                        encodeTo_ = httpPath_;
+                                        nextState_ = parseState_;
+                                        parseState_ = StartLineParseState.DECODING_FIRST_CHAR;
+                                        break;
+                                case ' ':
+                                        parseState_ = StartLineParseState.EATING_WHITESPACE;
+                                        nextState_ = StartLineParseState.READING_VERSION;
+                                        break;
+                                case '?':
+                                        parseState_ = StartLineParseState.READING_QUERY;
+                                        break;
+                                default:
+                                        httpPath_.append((char) got);
+                                        break;
+                            }
+                        }
+                        break;
+
+                case READING_QUERY:
+                        while(pos < limit && parseState_ == StartLineParseState.READING_QUERY) 
+                        {
+                            got = buffer.get(pos++);
+
+                            switch (got)
+                            {
+                                case '\r':
+                                        parseState_ = StartLineParseState.CHECKING_EOL;
+                                        break;
+                                case '%':
+                                        encodeTo_ = httpQuery_;
+                                        nextState_ = parseState_;
+                                        parseState_ = StartLineParseState.DECODING_FIRST_CHAR;
+                                        break;
+                                case ' ':
+                                        parseState_ = StartLineParseState.EATING_WHITESPACE;
+                                        nextState_ = StartLineParseState.READING_VERSION;
+                                        break;
+                                case '+':
+                                        httpQuery_.append(' ');
+                                        break;
+                                default:
+                                        httpQuery_.append((char) got);
+                                        break;
+                            }
+                        }
+                        break;
+
+                case DECODING_FIRST_CHAR:
+                        got = (int)buffer.get(pos++);
+                        encodedValue_ = decodeHex(got) * 16;
+                        parseState_ = StartLineParseState.DECODING_SECOND_CHAR;
+                        break;
+
+                case DECODING_SECOND_CHAR:
+                        got = (int)buffer.get(pos++);
+                        encodeTo_.append((char) (decodeHex(got) + encodedValue_));
+                        parseState_ = nextState_;
+                        break;
+
+                case READING_VERSION:
+                        while(pos < limit && (got = buffer.get(pos)) != '\r' )
+                        {
+                            httpVersion_.append((char)got);
+                            pos++;
+                        }
+                        if(pos < limit) 
+                        {
+                            parseState_ = StartLineParseState.CHECKING_EOL;
+                            pos++; // skipping '\r'
+                        }
+                        break;
+
+                case CHECKING_EOL:
+                        switch (buffer.get(pos++))
+                        {
+                            case '\n':
+                                    finishLine_();
+                                    parseState_ = StartLineParseState.TO_RESET;
+                                    buffer.position(pos);
+                                    return true;  //could have reached limit here
+                            default:
+                                    throw new HttpParsingException();
+                        }
+                        
+                default:
+                        throw new HttpParsingException();
+            }
+        } 
+
+        buffer.position(pos);
+        return false;
+    }
+}
+

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpWriteResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpWriteResponse.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpWriteResponse.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpWriteResponse.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This class writes the HTTP 1.1 responses back to the client.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.io.*;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpWriteResponse
+{
+    private HttpRequest httpRequest_ = null;
+    private StringBuilder body_ = new StringBuilder();
+
+    public HttpWriteResponse(HttpRequest httpRequest)
+    {
+        httpRequest_ = httpRequest;
+    }
+
+    public void println(String responseLine)
+    {
+        if(responseLine != null)
+        {
+            body_.append(responseLine);
+            body_.append( System.getProperty("line.separator"));
+        }
+    }
+
+    public ByteBuffer flush() throws Exception
+    {
+        StringBuilder sb = new StringBuilder();
+        // write out the HTTP response headers first
+        sb.append(httpRequest_.getVersion() + " 200 OK\r\n");
+        sb.append("Content-Type: text/html\r\n");
+        if(body_.length() > 0)
+        	sb.append("Content-Length: " + body_.length() + "\r\n");
+        sb.append("Cache-Control: no-cache\r\n");
+        sb.append("Pragma: no-cache\r\n");
+
+        // terminate the headers
+        sb.append("\r\n");
+
+        // now write out the HTTP response body
+        if(body_.length() > 0)
+            sb.append(body_.toString());
+
+        // terminate the body
+        //sb.append("\r\n");
+        //sb.append("\r\n");
+        ByteBuffer buffer = ByteBuffer.wrap(sb.toString().getBytes());
+        return buffer;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentLengthState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentLengthState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentLengthState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentLengthState.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+
+import org.apache.cassandra.utils.FBUtilities;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ContentLengthState extends StartState
+{
+    private ByteBuffer buffer_;
+
+    ContentLengthState(TcpReader stream)
+    {
+        super(stream);
+        buffer_ = ByteBuffer.allocate(4);
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {        
+        return doRead(buffer_);
+    }
+
+    public void morphState() throws IOException
+    {
+        int size = FBUtilities.byteArrayToInt(buffer_.array());        
+        StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT);
+        if ( nextState == null )
+        {
+            nextState = new ContentState(stream_, size);
+            stream_.putSocketState( TcpReader.TcpReaderState.CONTENT, nextState );
+        }
+        else
+        {               
+            nextState.setContextData(size);
+        }
+        stream_.morphState( nextState );
+        buffer_.clear();
+    }
+    
+    public void setContextData(Object data)
+    {
+        throw new UnsupportedOperationException("This method is not supported in the ContentLengthState");
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentState.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ContentState extends StartState
+{
+    private ByteBuffer buffer_;   
+    private int length_;
+
+    ContentState(TcpReader stream, int length)
+    {
+        super(stream);
+        length_ = length; 
+        buffer_ = ByteBuffer.allocate(length_);
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {          
+        return doRead(buffer_);
+    }
+
+    public void morphState() throws IOException
+    {        
+        StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.DONE);
+        if ( nextState == null )
+        {
+            nextState = new DoneState(stream_, toBytes());
+            stream_.putSocketState( TcpReader.TcpReaderState.DONE, nextState );
+        }
+        else
+        {            
+            nextState.setContextData(toBytes());
+        }
+        stream_.morphState( nextState );               
+    }
+    
+    private byte[] toBytes()
+    {
+        buffer_.position(0); 
+        /*
+        ByteBuffer slice = buffer_.slice();        
+        return slice.array();
+        */  
+        byte[] bytes = new byte[length_];
+        buffer_.get(bytes, 0, length_);
+        return bytes;
+    }
+    
+    public void setContextData(Object data)
+    {
+        Integer value = (Integer)data;
+        length_ = value;               
+        buffer_.clear();
+        if ( buffer_.capacity() < length_ )
+            buffer_ = ByteBuffer.allocate(length_);
+        else
+        {            
+            buffer_.limit(length_);
+        }        
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentStreamState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentStreamState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentStreamState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentStreamState.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
+import java.io.*;
+
+import org.apache.cassandra.db.Table;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+
+class ContentStreamState extends StartState
+{       
+    private static Logger logger_ = Logger.getLogger(ContentStreamState.class);
+    private static long count_ = 64*1024*1024;
+    /* Return this byte array to exit event loop */
+    private static byte[] bytes_ = new byte[1];
+    private long bytesRead_ = 0L;
+    private FileChannel fc_;
+    private StreamContextManager.StreamContext streamContext_;
+    private StreamContextManager.StreamStatus streamStatus_;
+    
+    ContentStreamState(TcpReader stream)
+    {
+        super(stream); 
+        SocketChannel socketChannel = stream.getStream();
+        InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+        String remoteHost = remoteAddress.getHostName();        
+        streamContext_ = StreamContextManager.getStreamContext(remoteHost);   
+        streamStatus_ = StreamContextManager.getStreamStatus(remoteHost);
+    }
+    
+    private void createFileChannel() throws IOException
+    {
+        if ( fc_ == null )
+        {
+            logger_.debug("Creating file for " + streamContext_.getTargetFile());
+            FileOutputStream fos = new FileOutputStream( streamContext_.getTargetFile(), true );
+            fc_ = fos.getChannel();            
+        }
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {        
+        SocketChannel socketChannel = stream_.getStream();
+        InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+        String remoteHost = remoteAddress.getHostName();  
+        createFileChannel();
+        if ( streamContext_ != null )
+        {  
+            try
+            {
+                bytesRead_ += fc_.transferFrom(socketChannel, bytesRead_, ContentStreamState.count_);
+                if ( bytesRead_ != streamContext_.getExpectedBytes() )
+                    throw new ReadNotCompleteException("Specified number of bytes have not been read from the Socket Channel");
+            }
+            catch ( IOException ex )
+            {
+                /* Ask the source node to re-stream this file. */
+                streamStatus_.setAction(StreamContextManager.StreamCompletionAction.STREAM);                
+                handleStreamCompletion(remoteHost);
+                /* Delete the orphaned file. */
+                File file = new File(streamContext_.getTargetFile());
+                file.delete();
+                throw ex;
+            }
+            if ( bytesRead_ == streamContext_.getExpectedBytes() )
+            {       
+                logger_.debug("Removing stream context " + streamContext_);                 
+                handleStreamCompletion(remoteHost);                              
+                bytesRead_ = 0L;
+                fc_.close();
+                morphState();
+            }                            
+        }
+        
+        return new byte[0];
+    }
+    
+    private void handleStreamCompletion(String remoteHost) throws IOException
+    {
+        /* 
+         * Streaming is complete. If all the data that has to be received inform the sender via 
+         * the stream completion callback so that the source may perform the requisite cleanup. 
+        */
+        IStreamComplete streamComplete = StreamContextManager.getStreamCompletionHandler(remoteHost);
+        if ( streamComplete != null )
+        {                    
+            streamComplete.onStreamCompletion(remoteHost, streamContext_, streamStatus_);                    
+        }
+    }
+
+    public void morphState() throws IOException
+    {        
+        /* We instantiate an array of size 1 so that we can exit the event loop of the read. */                
+        StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.DONE);
+        if ( nextState == null )
+        {
+            nextState = new DoneState(stream_, ContentStreamState.bytes_);
+            stream_.putSocketState( TcpReader.TcpReaderState.DONE, nextState );
+        }
+        else
+        {
+            nextState.setContextData(ContentStreamState.bytes_);
+        }
+        stream_.morphState( nextState );  
+    }
+    
+    public void setContextData(Object data)
+    {
+        throw new UnsupportedOperationException("This method is not supported in the ContentStreamState");
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/DoneState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/DoneState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/DoneState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/DoneState.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class DoneState extends StartState
+{
+    private byte[] bytes_ = new byte[0];
+
+    DoneState(TcpReader stream, byte[] bytes)
+    {
+        super(stream);
+        bytes_ = bytes;
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {        
+        morphState();
+        return bytes_;
+    }
+
+    public void morphState() throws IOException
+    {                       
+        stream_.morphState(null);
+    }
+    
+    public void setContextData(Object data)
+    {                
+        bytes_ = (byte[])data;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/FastSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/FastSerializer.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/FastSerializer.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/FastSerializer.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.net.Message;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class FastSerializer implements ISerializer
+{ 
+    public byte[] serialize(Message message) throws IOException
+    {
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        Message.serializer().serialize(message, buffer);
+        return buffer.getData();
+    }
+    
+    public Message deserialize(byte[] bytes) throws IOException
+    {
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(bytes, bytes.length);
+        return Message.serializer().deserialize(bufIn);
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ISerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ISerializer.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ISerializer.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ISerializer.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.net.Message;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface ISerializer
+{
+    public byte[] serialize(Message message) throws IOException;
+    public Message deserialize(byte[] bytes) throws IOException;
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/IStreamComplete.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/IStreamComplete.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/IStreamComplete.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/IStreamComplete.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IStreamComplete
+{
+    /*
+     * This callback if registered with the StreamContextManager is 
+     * called when the stream from a host is completely handled. 
+    */
+    public void onStreamCompletion(String from, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException;
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ProtocolHeaderState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ProtocolHeaderState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ProtocolHeaderState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ProtocolHeaderState.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import org.apache.cassandra.utils.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ProtocolHeaderState extends StartState
+{
+    private ByteBuffer buffer_;
+
+    public ProtocolHeaderState(TcpReader stream)
+    {
+        super(stream);
+        buffer_ = ByteBuffer.allocate(4);
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {        
+        return doRead(buffer_);
+    }
+
+    public void morphState() throws IOException
+    {
+        byte[] protocolHeader = buffer_.array();
+        int pH = MessagingService.byteArrayToInt(protocolHeader);
+        
+        int type = MessagingService.getBits(pH, 1, 2);
+        stream_.getProtocolHeader().serializerType_ = type;
+        
+        int stream = MessagingService.getBits(pH, 3, 1);
+        stream_.getProtocolHeader().isStreamingMode_ = (stream == 1) ? true : false;
+        
+        if ( stream_.getProtocolHeader().isStreamingMode_ )
+            MessagingService.setStreamingMode(true);
+        
+        int listening = MessagingService.getBits(pH, 4, 1);
+        stream_.getProtocolHeader().isListening_ = (listening == 1) ? true : false;
+        
+        int version = MessagingService.getBits(pH, 15, 8);
+        stream_.getProtocolHeader().version_ = version;
+        
+        if ( version <= MessagingService.getVersion() )
+        {
+            if ( stream_.getProtocolHeader().isStreamingMode_ )
+            { 
+                StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT_STREAM);
+                if ( nextState == null )
+                {
+                    nextState = new ContentStreamState(stream_);
+                    stream_.putSocketState( TcpReader.TcpReaderState.CONTENT_STREAM, nextState );
+                }
+                stream_.morphState( nextState );
+                buffer_.clear();
+            }
+            else
+            {
+                StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT_LENGTH);
+                if ( nextState == null )
+                {
+                    nextState = new ContentLengthState(stream_);
+                    stream_.putSocketState( TcpReader.TcpReaderState.CONTENT_LENGTH, nextState );
+                }                
+                stream_.morphState( nextState );   
+                buffer_.clear();
+            }            
+        }
+        else
+        {
+            throw new IOException("Invalid version in message. Scram.");
+        }
+    }
+    
+    public void setContextData(Object data)
+    {
+        throw new UnsupportedOperationException("This method is not supported in the ProtocolHeaderState");
+    }
+}
+
+

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ProtocolState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ProtocolState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ProtocolState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ProtocolState.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import org.apache.cassandra.utils.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ProtocolState extends StartState
+{
+    private ByteBuffer buffer_;
+
+    public ProtocolState(TcpReader stream)
+    {
+        super(stream);
+        buffer_ = ByteBuffer.allocate(16);
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {        
+        return doRead(buffer_);
+    }
+
+    public void morphState() throws IOException
+    {
+        byte[] protocol = buffer_.array();
+        if ( MessagingService.isProtocolValid(protocol) )
+        {            
+            StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.PROTOCOL);
+            if ( nextState == null )
+            {
+                nextState = new ProtocolHeaderState(stream_);
+                stream_.putSocketState( TcpReader.TcpReaderState.PROTOCOL, nextState );
+            }
+            stream_.morphState( nextState ); 
+            buffer_.clear();
+        }
+        else
+        {
+            throw new IOException("Invalid protocol header. The preamble seems to be messed up.");
+        }
+    }
+    
+    public void setContextData(Object data)
+    {
+        throw new UnsupportedOperationException("This method is not supported in the ProtocolState");
+    }
+}
+

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ReadNotCompleteException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ReadNotCompleteException.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ReadNotCompleteException.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ReadNotCompleteException.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: lakshman
+ * Date: Aug 22, 2005
+ * Time: 11:37:31 AM
+ * To change this template use File | Settings | File Templates.
+ */
+public class ReadNotCompleteException extends Exception
+{
+    ReadNotCompleteException(String message)
+    {
+        super(message);
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/SerializerAttribute.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/SerializerAttribute.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/SerializerAttribute.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/SerializerAttribute.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.lang.annotation.*;
+
+@Retention(RetentionPolicy.RUNTIME)
+public @interface SerializerAttribute
+{
+    SerializerType value();
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/SerializerType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/SerializerType.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/SerializerType.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/SerializerType.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+public enum SerializerType
+{
+    BINARY,
+    JAVA,
+    XML,
+    JSON
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/StartState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/StartState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/StartState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/StartState.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.channels.SocketChannel;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public abstract class StartState
+{
+    protected TcpReader stream_;
+
+    public StartState(TcpReader stream)
+    {
+        stream_ = stream;
+    }
+
+    public abstract byte[] read() throws IOException, ReadNotCompleteException;
+    public abstract void morphState() throws IOException;
+    public abstract void setContextData(Object data);
+
+    protected byte[] doRead(ByteBuffer buffer) throws IOException, ReadNotCompleteException
+    {        
+        SocketChannel socketChannel = stream_.getStream();
+        int bytesRead = socketChannel.read(buffer);     
+        if ( bytesRead == -1 && buffer.remaining() > 0 )
+        {            
+            throw new IOException("Reached an EOL or something bizzare occured. Reading from: " + socketChannel.socket().getInetAddress() + " BufferSizeRemaining: " + buffer.remaining());
+        }
+        if ( buffer.remaining() == 0 )
+        {
+            morphState();
+        }
+        else
+        {            
+            throw new ReadNotCompleteException("Specified number of bytes have not been read from the Socket Channel");
+        }
+        return new byte[0];
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/StreamContextManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/StreamContextManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/StreamContextManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/StreamContextManager.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.BootstrapInitiateMessage;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class StreamContextManager
+{
+    private static Logger logger_ = Logger.getLogger(StreamContextManager.class);
+    
+    public static enum StreamCompletionAction
+    {
+        DELETE,
+        STREAM
+    }
+    
+    public static class StreamContext implements Serializable
+    {
+        private static Logger logger_ = Logger.getLogger(StreamContextManager.StreamContext.class);
+        private static ICompactSerializer<StreamContext> serializer_;
+        
+        static
+        {
+            serializer_ = new StreamContextSerializer();
+        }
+        
+        public static ICompactSerializer<StreamContext> serializer()
+        {
+            return serializer_;
+        }
+                
+        private String targetFile_;        
+        private long expectedBytes_;                     
+        
+        public StreamContext(String targetFile, long expectedBytes)
+        {
+            targetFile_ = targetFile;
+            expectedBytes_ = expectedBytes;         
+        }                
+                
+        public String getTargetFile()
+        {
+            return targetFile_;
+        }
+        
+        public void setTargetFile(String file)
+        {
+            targetFile_ = file;
+        }
+        
+        public long getExpectedBytes()
+        {
+            return expectedBytes_;
+        }
+                
+        public boolean equals(Object o)
+        {
+            if ( !(o instanceof StreamContext) )
+                return false;
+            
+            StreamContext rhs = (StreamContext)o;
+            return targetFile_.equals(rhs.targetFile_);
+        }
+        
+        public int hashCode()
+        {
+            return toString().hashCode();
+        }
+        
+        public String toString()
+        {
+            return targetFile_ + ":" + expectedBytes_;
+        }
+    }
+    
+    public static class StreamContextSerializer implements ICompactSerializer<StreamContext>
+    {
+        public void serialize(StreamContextManager.StreamContext sc, DataOutputStream dos) throws IOException
+        {
+            dos.writeUTF(sc.targetFile_);
+            dos.writeLong(sc.expectedBytes_);            
+        }
+        
+        public StreamContextManager.StreamContext deserialize(DataInputStream dis) throws IOException
+        {
+            String targetFile = dis.readUTF();
+            long expectedBytes = dis.readLong();           
+            return new StreamContext(targetFile, expectedBytes);
+        }
+    }
+    
+    public static class StreamStatus
+    {
+        private static ICompactSerializer<StreamStatus> serializer_;
+        
+        static 
+        {
+            serializer_ = new StreamStatusSerializer();
+        }
+        
+        public static ICompactSerializer<StreamStatus> serializer()
+        {
+            return serializer_;
+        }
+            
+        private String file_;               
+        private long expectedBytes_;                
+        private StreamCompletionAction action_;
+                
+        public StreamStatus(String file, long expectedBytes)
+        {
+            file_ = file;
+            expectedBytes_ = expectedBytes;
+            action_ = StreamContextManager.StreamCompletionAction.DELETE;
+        }
+        
+        public String getFile()
+        {
+            return file_;
+        }
+        
+        public long getExpectedBytes()
+        {
+            return expectedBytes_;
+        }
+        
+        void setAction(StreamContextManager.StreamCompletionAction action)
+        {
+            action_ = action;
+        }
+        
+        public StreamContextManager.StreamCompletionAction getAction()
+        {
+            return action_;
+        }
+    }
+    
+    public static class StreamStatusSerializer implements ICompactSerializer<StreamStatus>
+    {
+        public void serialize(StreamStatus streamStatus, DataOutputStream dos) throws IOException
+        {
+            dos.writeUTF(streamStatus.getFile());
+            dos.writeLong(streamStatus.getExpectedBytes());
+            dos.writeInt(streamStatus.getAction().ordinal());
+        }
+        
+        public StreamStatus deserialize(DataInputStream dis) throws IOException
+        {
+            String targetFile = dis.readUTF();
+            long expectedBytes = dis.readLong();
+            StreamStatus streamStatus = new StreamStatus(targetFile, expectedBytes);
+            
+            int ordinal = dis.readInt();                        
+            if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
+            {
+                streamStatus.setAction(StreamCompletionAction.DELETE);
+            }
+            else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
+            {
+                streamStatus.setAction(StreamCompletionAction.STREAM);
+            }
+            
+            return streamStatus;
+        }
+    }
+    
+    public static class StreamStatusMessage implements Serializable
+    {
+        private static ICompactSerializer<StreamStatusMessage> serializer_;
+        
+        static 
+        {
+            serializer_ = new StreamStatusMessageSerializer();
+        }
+        
+        public static ICompactSerializer<StreamStatusMessage> serializer()
+        {
+            return serializer_;
+        }
+        
+        public static Message makeStreamStatusMessage(StreamStatusMessage streamStatusMessage) throws IOException
+        {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream( bos );
+            StreamStatusMessage.serializer().serialize(streamStatusMessage, dos);
+            return new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapTerminateVerbHandler_, new Object[]{bos.toByteArray()});
+        }
+        
+        protected StreamContextManager.StreamStatus streamStatus_;
+        
+        public StreamStatusMessage(StreamContextManager.StreamStatus streamStatus)
+        {
+            streamStatus_ = streamStatus;
+        }
+        
+        public StreamContextManager.StreamStatus getStreamStatus()
+        {
+            return streamStatus_;
+        }
+    }
+    
+    public static class StreamStatusMessageSerializer implements ICompactSerializer<StreamStatusMessage>
+    {
+        public void serialize(StreamStatusMessage streamStatusMessage, DataOutputStream dos) throws IOException
+        {
+            StreamStatus.serializer().serialize(streamStatusMessage.streamStatus_, dos);            
+        }
+        
+        public StreamStatusMessage deserialize(DataInputStream dis) throws IOException
+        {            
+            StreamContextManager.StreamStatus streamStatus = StreamStatus.serializer().deserialize(dis);         
+            return new StreamStatusMessage(streamStatus);
+        }
+    }
+        
+    /* Maintain a stream context per host that is the source of the stream */
+    public static Map<String, List<StreamContext>> ctxBag_ = new Hashtable<String, List<StreamContext>>();  
+    /* Maintain in this map the status of the streams that need to be sent back to the source */
+    public static Map<String, List<StreamStatus>> streamStatusBag_ = new Hashtable<String, List<StreamStatus>>();
+    /* Maintains a callback handler per endpoint to notify the app that a stream from a given endpoint has been handled */
+    public static Map<String, IStreamComplete> streamNotificationHandlers_ = new HashMap<String, IStreamComplete>();
+    
+    public synchronized static StreamContext getStreamContext(String key)
+    {        
+        List<StreamContext> context = ctxBag_.get(key);
+        if ( context == null )
+            throw new IllegalStateException("Streaming context has not been set.");
+        StreamContext streamContext = context.remove(0);        
+        if ( context.isEmpty() )
+            ctxBag_.remove(key);
+        return streamContext;
+    }
+    
+    public synchronized static StreamStatus getStreamStatus(String key)
+    {
+        List<StreamStatus> status = streamStatusBag_.get(key);
+        if ( status == null )
+            throw new IllegalStateException("Streaming status has not been set.");
+        StreamStatus streamStatus = status.remove(0);        
+        if ( status.isEmpty() )
+            streamStatusBag_.remove(key);
+        return streamStatus;
+    }
+    
+    /*
+     * This method helps determine if the StreamCompletionHandler needs
+     * to be invoked for the data being streamed from a source. 
+    */
+    public synchronized static boolean isDone(String key)
+    {
+        return (ctxBag_.get(key) == null);
+    }
+    
+    public synchronized static IStreamComplete getStreamCompletionHandler(String key)
+    {
+        return streamNotificationHandlers_.get(key);
+    }
+    
+    public synchronized static void removeStreamCompletionHandler(String key)
+    {
+        streamNotificationHandlers_.remove(key);
+    }
+    
+    public synchronized static void registerStreamCompletionHandler(String key, IStreamComplete streamComplete)
+    {
+        streamNotificationHandlers_.put(key, streamComplete);
+    }
+    
+    public synchronized static void addStreamContext(String key, StreamContext streamContext, StreamStatus streamStatus)
+    {
+        /* Record the stream context */
+        List<StreamContext> context = ctxBag_.get(key);        
+        if ( context == null )
+        {
+            context = new ArrayList<StreamContext>();
+            ctxBag_.put(key, context);
+        }
+        context.add(streamContext);
+        
+        /* Record the stream status for this stream context */
+        List<StreamStatus> status = streamStatusBag_.get(key);
+        if ( status == null )
+        {
+            status = new ArrayList<StreamStatus>();
+            streamStatusBag_.put(key, status);
+        }
+        status.add( streamStatus );
+    }        
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/TcpReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/TcpReader.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/TcpReader.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/TcpReader.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.*;
+
+import org.apache.cassandra.net.ProtocolHeader;
+import org.apache.cassandra.net.TcpConnection;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TcpReader
+{
+    public static enum TcpReaderState
+    {
+        START,
+        PREAMBLE,
+        PROTOCOL,
+        CONTENT_LENGTH,
+        CONTENT,
+        CONTENT_STREAM,
+        DONE
+    }
+    
+    private Map<TcpReaderState, StartState> stateMap_ = new HashMap<TcpReaderState, StartState>();
+    private TcpConnection connection_;
+    private StartState socketState_;
+    private ProtocolHeader protocolHeader_;
+    
+    public TcpReader(TcpConnection connection)
+    {
+        connection_ = connection;        
+    }
+    
+    public StartState getSocketState(TcpReaderState state)
+    {
+        return stateMap_.get(state);
+    }
+    
+    public void putSocketState(TcpReaderState state, StartState socketState)
+    {
+        stateMap_.put(state, socketState);
+    } 
+    
+    public void resetState()
+    {
+        StartState nextState = stateMap_.get(TcpReaderState.PREAMBLE);
+        if ( nextState == null )
+        {
+            nextState = new ProtocolState(this);
+            stateMap_.put(TcpReaderState.PREAMBLE, nextState);
+        }
+        socketState_ = nextState;
+    }
+    
+    public void morphState(StartState state)
+    {        
+        socketState_ = state;
+        if ( protocolHeader_ == null )
+            protocolHeader_ = new ProtocolHeader();
+    }
+    
+    public ProtocolHeader getProtocolHeader()
+    {
+        return protocolHeader_;
+    }
+    
+    public SocketChannel getStream()
+    {
+        return connection_.getSocketChannel();
+    }
+    
+    public byte[] read() throws IOException
+    {
+        byte[] bytes = new byte[0];      
+        while ( socketState_ != null )
+        {
+            try
+            {                                                                      
+                bytes = socketState_.read();
+            }
+            catch ( ReadNotCompleteException e )
+            {                
+                break;
+            }
+        }
+        return bytes;
+    }    
+    
+    public static void main(String[] args) throws Throwable
+    {
+        Map<TcpReaderState, StartState> stateMap = new HashMap<TcpReaderState, StartState>();
+        stateMap.put(TcpReaderState.CONTENT, new ContentState(null, 10));
+        stateMap.put(TcpReaderState.START, new ProtocolState(null));
+        stateMap.put(TcpReaderState.CONTENT_LENGTH, new ContentLengthState(null));
+        
+        StartState state = stateMap.get(TcpReaderState.CONTENT);
+        System.out.println( state.getClass().getName() );
+        state = stateMap.get(TcpReaderState.CONTENT_LENGTH);
+        System.out.println( state.getClass().getName() );
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/sink/IMessageSink.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/sink/IMessageSink.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/sink/IMessageSink.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/sink/IMessageSink.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.sink;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IMessageSink
+{
+    public Message handleMessage(Message message);    
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/sink/SinkManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/sink/SinkManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/sink/SinkManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/sink/SinkManager.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.sink;
+
+import java.util.*;
+import java.io.IOException;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SinkManager
+{
+    private static LinkedList<IMessageSink> messageSinks_ = new LinkedList<IMessageSink>();
+
+    public static boolean isInitialized()
+    {
+        return ( messageSinks_.size() > 0 );
+    }
+
+    public static void addMessageSink(IMessageSink ms)
+    {
+        messageSinks_.addLast(ms);
+    }
+    
+    public static void clearSinks(){
+        messageSinks_.clear();
+    }
+
+    public static Message processClientMessageSink(Message message)
+    {
+        ListIterator<IMessageSink> li = messageSinks_.listIterator();
+        while ( li.hasNext() )
+        {
+            IMessageSink ms = li.next();
+            message = ms.handleMessage(message);
+            if ( message == null )
+            {
+                return null;
+            }
+        }
+        return message;
+    }
+
+    public static Message processServerMessageSink(Message message)
+    {
+        ListIterator<IMessageSink> li = messageSinks_.listIterator(messageSinks_.size());
+        while ( li.hasPrevious() )
+        {
+            IMessageSink ms = li.previous();
+            message = ms.handleMessage(message);
+            if ( message == null )
+            {
+                return null;
+            }
+        }
+        return message;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/procedures/GroovyScriptRunner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/procedures/GroovyScriptRunner.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/procedures/GroovyScriptRunner.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/procedures/GroovyScriptRunner.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,13 @@
+package org.apache.cassandra.procedures;
+
+import groovy.lang.GroovyShell;
+
+public class GroovyScriptRunner
+{
+	private static GroovyShell groovyShell_ = new GroovyShell();
+
+	public static String evaluateString(String script)
+	{        
+		 return groovyShell_.evaluate(script).toString();
+	}
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/BootstrapAndLbHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/BootstrapAndLbHelper.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/BootstrapAndLbHelper.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/BootstrapAndLbHelper.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,64 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.math.BigInteger;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.cassandra.io.SSTable;
+import org.apache.log4j.Logger;
+
+public final class BootstrapAndLbHelper
+{
+    private static final Logger logger_ = Logger.getLogger(BootstrapAndLbHelper.class);
+    private static List<String> getIndexedPrimaryKeys()
+    {
+        List<String> indexedPrimaryKeys = SSTable.getIndexedKeys();
+        Iterator<String> it = indexedPrimaryKeys.iterator();
+        
+        while ( it.hasNext() )
+        {
+            String key = it.next();
+            if ( !StorageService.instance().isPrimary(key) )
+            {
+                it.remove();
+            }
+        }
+        return indexedPrimaryKeys;
+    }
+    
+    /**
+     * Given the number of keys that need to be transferred say, 1000
+     * and given the smallest key stored we need the hash of the 1000th
+     * key greater than the smallest key in the sorted order in the primary
+     * range.
+     * 
+     * @param keyCount number of keys after which token is required.
+     * @return token.
+    */
+    public static BigInteger getTokenBasedOnPrimaryCount(int keyCount)
+    {
+        List<String> indexedPrimaryKeys = getIndexedPrimaryKeys();
+        int index = keyCount / SSTable.indexInterval();
+        String key = (index >= indexedPrimaryKeys.size()) ? indexedPrimaryKeys.get( indexedPrimaryKeys.size() - 1 ) : indexedPrimaryKeys.get(index);
+        logger_.debug("Hashing key " + key + " ...");
+        return StorageService.instance().hash(key);
+    }
+}