You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC
svn commit: r749218 [25/34] - in /incubator/cassandra: branches/ dist/
nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/
trunk/src/org/apache/ trunk/src/org/apache/cassandra/
trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpRequest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpRequest.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpRequest.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpRequest.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Encapsulates a HTTP request.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.util.*;
+import java.io.*;
+import java.net.URLDecoder;
+import java.nio.*;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpRequest
+{
+ private Map<String, String> headersMap_ = new HashMap<String, String>();
+ private Map<String, String> paramsMap_ = new HashMap<String, String>();
+ private String sBody_ = "";
+ private String method_;
+ private String path_;
+ private String query_;
+ private String version_;
+
+ /*
+ * Returns the type of method - GET, POST, etc.
+ */
+ public String getMethod()
+ {
+ return method_;
+ }
+
+ /*
+ * Gets the request path referenced by the request.
+ * For example, if the URL is of the form:
+ * http://somedomain:PORT/some/path?param=value
+ * this function will return
+ * "/some/path"
+ */
+ public String getPath()
+ {
+ return path_;
+ }
+
+ /*
+ * Gets the query in the request.
+ * For example, if the URL is of the form:
+ * http://somedomain:PORT/some/path?param=value
+ * this function will return
+ * "/some/path"
+ */
+ public String getQuery()
+ {
+ return query_;
+ }
+
+ /*
+ * Returns the supported HTTP protocol version.
+ */
+ public String getVersion()
+ {
+ return "HTTP/1.1";
+ }
+
+ /*
+ * This function add to the map of header name-values
+ * in the HTTP request.
+ */
+ public void addHeader(String name, String value)
+ {
+ headersMap_.put(name, value);
+ }
+
+ /*
+ * For a gives name, returns the value if it was in the
+ * headers. Returns the empty string otherwise.
+ */
+ public String getHeader(String name)
+ {
+ if(headersMap_.get(name) == null)
+ return "";
+ return headersMap_.get(name).toString();
+ }
+
+ public void setParameter(String name, String value)
+ {
+ // first decode the data then store it in the map using the standard UTF-8 encoding
+ String decodedValue = value;
+ try
+ {
+ decodedValue = URLDecoder.decode(value, "UTF-8");
+ }
+ catch (Exception e)
+ {
+ // do nothing
+ }
+ paramsMap_.put(name, decodedValue);
+ }
+
+ /*
+ * This function get the parameters from the body of the HTTP message.
+ * Returns the value for the parameter if one exists or returns null.
+ *
+z * For example, if the body is of the form:
+ * a=b&c=d
+ * this function will:
+ * return "b" when called as getParameter("a")
+ */
+ public String getParameter(String name)
+ {
+ if(paramsMap_.get(name) != null)
+ return paramsMap_.get(name).toString();
+ return null;
+ }
+
+ /*
+ * Get the string representation of the byte buffers passed in and put
+ * them in sBody_ variable. Then parse all the parameters in the body.
+ */
+ public void setBody(List<ByteBuffer> bodyBuffers)
+ {
+ if(bodyBuffers == null)
+ return;
+ try
+ {
+ // get the byte buffers that the body should be composed of
+ // and collect them in the string builder
+ StringBuilder sb = new StringBuilder();
+ for(int i = 0; i < bodyBuffers.size(); ++i)
+ {
+ ByteBuffer bb = bodyBuffers.get(i);
+ if(bb.remaining() <= 0)
+ {
+ continue;
+ }
+ byte[] byteStr = new byte[bb.remaining()];
+ bb.get(byteStr);
+ String s = new String(byteStr);
+ sb.append(s);
+ }
+
+ // add the string to the body
+ if(sb.toString() != null)
+ {
+ sBody_ += sb.toString();
+ }
+
+ // once we are done with the body, parse the parameters
+ String[] sParamValues = sBody_.split("&");
+ for(int i = 0; i < sParamValues.length; ++i)
+ {
+ String[] paramVal = sParamValues[i].split("=");
+ if ( paramVal[0] != null && paramVal[1] != null )
+ {
+ setParameter(paramVal[0], paramVal[1]);
+ }
+ }
+ }
+ catch(Exception e)
+ {
+ }
+ }
+
+ public String getBody()
+ {
+ return sBody_;
+ }
+
+ public void setStartLine(String method, String path, String query, String version)
+ {
+ method_ = method;
+ path_ = path;
+ query_ = query;
+ version_ = version;
+ }
+
+ public String toString()
+ {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+
+ pw.println("HttpRequest-------->");
+ pw.println("method = " + method_ + ", path = " + path_ + ", query = " + query_ + ", version = " + version_);
+ pw.println("Headers: " + headersMap_.toString());
+ pw.println("Body: " + sBody_);
+ pw.println("<--------HttpRequest");
+
+ return sw.toString();
+ }
+}
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpResponse.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpResponse.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpResponse.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.util.*;
+import java.io.*;
+import java.nio.*;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpResponse
+{
+ private Map<String, String> headersMap_ = new HashMap<String, String>();;
+ private String sBody_ = null;
+ private String method_ = null;
+ private String path_ = null;
+ private String version_ = null;
+
+
+ public String getMethod()
+ {
+ return method_;
+ }
+
+ public String getPath()
+ {
+ return path_;
+ }
+
+ public String getVersion()
+ {
+ return "HTTP/1.1";
+ }
+
+ public void addHeader(String name, String value)
+ {
+ headersMap_.put(name, value);
+ }
+
+ public String getHeader(String name)
+ {
+ return headersMap_.get(name).toString();
+ }
+
+ public void setBody(List<ByteBuffer> bodyBuffers)
+ {
+ StringBuffer sb = new StringBuffer();
+ while(bodyBuffers.size() > 0)
+ {
+ sb.append(bodyBuffers.remove(0).asCharBuffer().toString());
+ }
+ sBody_ = sb.toString();
+ }
+
+ public String getBody()
+ {
+ return sBody_;
+ }
+
+ public void setStartLine(String method, String path, String version)
+ {
+ method_ = method;
+ path_ = path;
+ version_ = version;
+ }
+
+ public String toString()
+ {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+
+ pw.println("HttpResponse-------->");
+ pw.println("method = " + method_ + ", path = " + path_ + ", version = " + version_);
+ pw.println("Headers: " + headersMap_.toString());
+ pw.println("Body: " + sBody_);
+ pw.println("<--------HttpResponse");
+
+ return sw.toString();
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpStartLineParser.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpStartLineParser.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpStartLineParser.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpStartLineParser.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpStartLineParser
+{
+ private Callback callback_;
+
+ public interface Callback
+ {
+ void onStartLine(String method, String path, String query, String version);
+ };
+
+ public HttpStartLineParser(Callback cb)
+ {
+ callback_ = cb;
+ }
+
+ private enum StartLineParseState
+ {
+ EATING_WHITESPACE,
+ READING_METHOD,
+ READING_PATH,
+ READING_QUERY,
+ DECODING_FIRST_CHAR,
+ DECODING_SECOND_CHAR,
+ READING_VERSION,
+ CHECKING_EOL,
+ TO_RESET
+ }
+
+ private StartLineParseState parseState_ = StartLineParseState.TO_RESET;
+ private StartLineParseState nextState_;
+ private StringBuilder httpMethod_ = new StringBuilder(32);
+ private StringBuilder httpPath_ = new StringBuilder();
+ private StringBuilder httpQuery_ = new StringBuilder(32);
+ private StringBuilder httpVersion_ = new StringBuilder();
+
+ // we will encode things of the form %{2 digit hex number} and this is a
+ // temporary holder for the leftmost digit's value as the second digit is
+ // being read
+ private int encodedValue_;
+ // this is a pointer to one of httpMethod_, httpPath_, httpQuery_,
+ // httpVersion_ so that the encoded value can be appended to the correct
+ // buffer
+ private StringBuilder encodeTo_;
+
+ public void resetParserState() {
+ httpMethod_.setLength(0);
+ httpPath_.setLength(0);
+ httpQuery_.setLength(0);
+ httpVersion_.setLength(0);
+
+ parseState_ = StartLineParseState.EATING_WHITESPACE;
+ nextState_ = StartLineParseState.READING_METHOD;
+ }
+
+ private void finishLine_()
+ {
+ if (callback_ != null)
+ {
+ callback_.onStartLine(
+ httpMethod_.toString(),
+ httpPath_.toString(),
+ httpQuery_.toString(),
+ httpVersion_.toString()
+ );
+ }
+ }
+
+ private static int decodeHex(int hex)
+ {
+ if (hex >= '0' && hex <= '9')
+ {
+ return hex-'0';
+ }
+ else if (hex >= 'a' && hex <= 'f')
+ {
+ return hex-'a'+10;
+ }
+ else if (hex >= 'A' && hex <= 'F')
+ {
+ return hex-'A'+10;
+ }
+ else
+ {
+ return 0;
+ }
+ }
+
+ public boolean onMoreBytes(InputStream in) throws HttpParsingException, IOException
+ {
+ int got;
+
+ if (parseState_ == StartLineParseState.TO_RESET)
+ {
+ resetParserState();
+ }
+
+ while (in.available() > 0)
+ {
+ in.mark(1);
+ got = in.read();
+
+ switch (parseState_)
+ {
+ case EATING_WHITESPACE:
+ switch (got)
+ {
+ case ' ':
+ break;
+ default:
+ in.reset();
+ parseState_ = nextState_;
+ break;
+ }
+ break;
+
+ case READING_METHOD:
+ switch (got)
+ {
+ case ' ':
+ parseState_ = StartLineParseState.EATING_WHITESPACE;
+ nextState_ = StartLineParseState.READING_PATH;
+ break;
+ default:
+ httpMethod_.append((char) got);
+ break;
+ }
+ break;
+
+ case READING_PATH:
+ switch (got)
+ {
+ case '\r':
+ parseState_ = StartLineParseState.CHECKING_EOL;
+ break;
+ case '%':
+ encodeTo_ = httpPath_;
+ nextState_ = parseState_;
+ parseState_ = StartLineParseState.DECODING_FIRST_CHAR;
+ break;
+ case ' ':
+ parseState_ = StartLineParseState.EATING_WHITESPACE;
+ nextState_ = StartLineParseState.READING_VERSION;
+ break;
+ case '?':
+ parseState_ = StartLineParseState.READING_QUERY;
+ break;
+ default:
+ httpPath_.append((char) got);
+ break;
+ }
+ break;
+
+ case READING_QUERY:
+ switch (got)
+ {
+ case '\r':
+ parseState_ = StartLineParseState.CHECKING_EOL;
+ break;
+ case '%':
+ encodeTo_ = httpQuery_;
+ nextState_ = parseState_;
+ parseState_ = StartLineParseState.DECODING_FIRST_CHAR;
+ break;
+ case ' ':
+ parseState_ = StartLineParseState.EATING_WHITESPACE;
+ nextState_ = StartLineParseState.READING_VERSION;
+ break;
+ case '+':
+ httpQuery_.append(' ');
+ break;
+ default:
+ httpQuery_.append((char) got);
+ break;
+ }
+ break;
+
+ case DECODING_FIRST_CHAR:
+ encodedValue_ = decodeHex(got) * 16;
+ parseState_ = StartLineParseState.DECODING_SECOND_CHAR;
+ break;
+
+ case DECODING_SECOND_CHAR:
+ encodeTo_.append((char) (decodeHex(got) + encodedValue_));
+ parseState_ = nextState_;
+ break;
+
+ case READING_VERSION:
+ switch (got)
+ {
+ case '\r':
+ parseState_ = StartLineParseState.CHECKING_EOL;
+ break;
+ default:
+ httpVersion_.append((char) got);
+ break;
+ }
+ break;
+
+ case CHECKING_EOL:
+ switch (got)
+ {
+ case '\n':
+ finishLine_();
+ parseState_ = StartLineParseState.TO_RESET;
+ return true;
+ default:
+ throw new HttpParsingException();
+ }
+
+ default:
+ throw new HttpParsingException();
+ }
+ }
+
+ return false;
+ }
+
+ public boolean onMoreBytesNew(ByteBuffer buffer) throws HttpParsingException, IOException
+ {
+ int got;
+ int limit = buffer.limit();
+ int pos = buffer.position();
+
+ if (parseState_ == StartLineParseState.TO_RESET)
+ {
+ resetParserState();
+ }
+
+ while(pos < limit)
+ {
+ switch(parseState_)
+ {
+ case EATING_WHITESPACE:
+ while((char)buffer.get(pos) == ' ' && ++pos < limit);
+ if(pos < limit)
+ parseState_ = nextState_;
+ break;
+
+ case READING_METHOD:
+ while(pos < limit && (got = buffer.get(pos)) != ' ')
+ {
+ httpMethod_.append((char)got);
+ pos++;
+ }
+
+ if(pos < limit)
+ {
+ parseState_ = StartLineParseState.EATING_WHITESPACE;
+ nextState_ = StartLineParseState.READING_PATH;
+ }
+ break;
+
+ case READING_PATH:
+ while(pos < limit && parseState_ == StartLineParseState.READING_PATH)
+ {
+ got = buffer.get(pos++);
+
+ switch (got)
+ {
+ case '\r':
+ parseState_ = StartLineParseState.CHECKING_EOL;
+ break;
+ case '%':
+ encodeTo_ = httpPath_;
+ nextState_ = parseState_;
+ parseState_ = StartLineParseState.DECODING_FIRST_CHAR;
+ break;
+ case ' ':
+ parseState_ = StartLineParseState.EATING_WHITESPACE;
+ nextState_ = StartLineParseState.READING_VERSION;
+ break;
+ case '?':
+ parseState_ = StartLineParseState.READING_QUERY;
+ break;
+ default:
+ httpPath_.append((char) got);
+ break;
+ }
+ }
+ break;
+
+ case READING_QUERY:
+ while(pos < limit && parseState_ == StartLineParseState.READING_QUERY)
+ {
+ got = buffer.get(pos++);
+
+ switch (got)
+ {
+ case '\r':
+ parseState_ = StartLineParseState.CHECKING_EOL;
+ break;
+ case '%':
+ encodeTo_ = httpQuery_;
+ nextState_ = parseState_;
+ parseState_ = StartLineParseState.DECODING_FIRST_CHAR;
+ break;
+ case ' ':
+ parseState_ = StartLineParseState.EATING_WHITESPACE;
+ nextState_ = StartLineParseState.READING_VERSION;
+ break;
+ case '+':
+ httpQuery_.append(' ');
+ break;
+ default:
+ httpQuery_.append((char) got);
+ break;
+ }
+ }
+ break;
+
+ case DECODING_FIRST_CHAR:
+ got = (int)buffer.get(pos++);
+ encodedValue_ = decodeHex(got) * 16;
+ parseState_ = StartLineParseState.DECODING_SECOND_CHAR;
+ break;
+
+ case DECODING_SECOND_CHAR:
+ got = (int)buffer.get(pos++);
+ encodeTo_.append((char) (decodeHex(got) + encodedValue_));
+ parseState_ = nextState_;
+ break;
+
+ case READING_VERSION:
+ while(pos < limit && (got = buffer.get(pos)) != '\r' )
+ {
+ httpVersion_.append((char)got);
+ pos++;
+ }
+ if(pos < limit)
+ {
+ parseState_ = StartLineParseState.CHECKING_EOL;
+ pos++; // skipping '\r'
+ }
+ break;
+
+ case CHECKING_EOL:
+ switch (buffer.get(pos++))
+ {
+ case '\n':
+ finishLine_();
+ parseState_ = StartLineParseState.TO_RESET;
+ buffer.position(pos);
+ return true; //could have reached limit here
+ default:
+ throw new HttpParsingException();
+ }
+
+ default:
+ throw new HttpParsingException();
+ }
+ }
+
+ buffer.position(pos);
+ return false;
+ }
+}
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpWriteResponse.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpWriteResponse.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpWriteResponse.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpWriteResponse.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This class writes the HTTP 1.1 responses back to the client.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.io.*;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpWriteResponse
+{
+ private HttpRequest httpRequest_ = null;
+ private StringBuilder body_ = new StringBuilder();
+
+ public HttpWriteResponse(HttpRequest httpRequest)
+ {
+ httpRequest_ = httpRequest;
+ }
+
+ public void println(String responseLine)
+ {
+ if(responseLine != null)
+ {
+ body_.append(responseLine);
+ body_.append( System.getProperty("line.separator"));
+ }
+ }
+
+ public ByteBuffer flush() throws Exception
+ {
+ StringBuilder sb = new StringBuilder();
+ // write out the HTTP response headers first
+ sb.append(httpRequest_.getVersion() + " 200 OK\r\n");
+ sb.append("Content-Type: text/html\r\n");
+ if(body_.length() > 0)
+ sb.append("Content-Length: " + body_.length() + "\r\n");
+ sb.append("Cache-Control: no-cache\r\n");
+ sb.append("Pragma: no-cache\r\n");
+
+ // terminate the headers
+ sb.append("\r\n");
+
+ // now write out the HTTP response body
+ if(body_.length() > 0)
+ sb.append(body_.toString());
+
+ // terminate the body
+ //sb.append("\r\n");
+ //sb.append("\r\n");
+ ByteBuffer buffer = ByteBuffer.wrap(sb.toString().getBytes());
+ return buffer;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentLengthState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentLengthState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentLengthState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentLengthState.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+
+import org.apache.cassandra.utils.FBUtilities;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ContentLengthState extends StartState
+{
+ private ByteBuffer buffer_;
+
+ ContentLengthState(TcpReader stream)
+ {
+ super(stream);
+ buffer_ = ByteBuffer.allocate(4);
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ return doRead(buffer_);
+ }
+
+ public void morphState() throws IOException
+ {
+ int size = FBUtilities.byteArrayToInt(buffer_.array());
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT);
+ if ( nextState == null )
+ {
+ nextState = new ContentState(stream_, size);
+ stream_.putSocketState( TcpReader.TcpReaderState.CONTENT, nextState );
+ }
+ else
+ {
+ nextState.setContextData(size);
+ }
+ stream_.morphState( nextState );
+ buffer_.clear();
+ }
+
+ public void setContextData(Object data)
+ {
+ throw new UnsupportedOperationException("This method is not supported in the ContentLengthState");
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentState.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ContentState extends StartState
+{
+ private ByteBuffer buffer_;
+ private int length_;
+
+ ContentState(TcpReader stream, int length)
+ {
+ super(stream);
+ length_ = length;
+ buffer_ = ByteBuffer.allocate(length_);
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ return doRead(buffer_);
+ }
+
+ public void morphState() throws IOException
+ {
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.DONE);
+ if ( nextState == null )
+ {
+ nextState = new DoneState(stream_, toBytes());
+ stream_.putSocketState( TcpReader.TcpReaderState.DONE, nextState );
+ }
+ else
+ {
+ nextState.setContextData(toBytes());
+ }
+ stream_.morphState( nextState );
+ }
+
+ private byte[] toBytes()
+ {
+ buffer_.position(0);
+ /*
+ ByteBuffer slice = buffer_.slice();
+ return slice.array();
+ */
+ byte[] bytes = new byte[length_];
+ buffer_.get(bytes, 0, length_);
+ return bytes;
+ }
+
+ public void setContextData(Object data)
+ {
+ Integer value = (Integer)data;
+ length_ = value;
+ buffer_.clear();
+ if ( buffer_.capacity() < length_ )
+ buffer_ = ByteBuffer.allocate(length_);
+ else
+ {
+ buffer_.limit(length_);
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentStreamState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentStreamState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentStreamState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ContentStreamState.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
+import java.io.*;
+
+import org.apache.cassandra.db.Table;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+
+class ContentStreamState extends StartState
+{
+ private static Logger logger_ = Logger.getLogger(ContentStreamState.class);
+ private static long count_ = 64*1024*1024;
+ /* Return this byte array to exit event loop */
+ private static byte[] bytes_ = new byte[1];
+ private long bytesRead_ = 0L;
+ private FileChannel fc_;
+ private StreamContextManager.StreamContext streamContext_;
+ private StreamContextManager.StreamStatus streamStatus_;
+
+ ContentStreamState(TcpReader stream)
+ {
+ super(stream);
+ SocketChannel socketChannel = stream.getStream();
+ InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+ String remoteHost = remoteAddress.getHostName();
+ streamContext_ = StreamContextManager.getStreamContext(remoteHost);
+ streamStatus_ = StreamContextManager.getStreamStatus(remoteHost);
+ }
+
+ private void createFileChannel() throws IOException
+ {
+ if ( fc_ == null )
+ {
+ logger_.debug("Creating file for " + streamContext_.getTargetFile());
+ FileOutputStream fos = new FileOutputStream( streamContext_.getTargetFile(), true );
+ fc_ = fos.getChannel();
+ }
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ SocketChannel socketChannel = stream_.getStream();
+ InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+ String remoteHost = remoteAddress.getHostName();
+ createFileChannel();
+ if ( streamContext_ != null )
+ {
+ try
+ {
+ bytesRead_ += fc_.transferFrom(socketChannel, bytesRead_, ContentStreamState.count_);
+ if ( bytesRead_ != streamContext_.getExpectedBytes() )
+ throw new ReadNotCompleteException("Specified number of bytes have not been read from the Socket Channel");
+ }
+ catch ( IOException ex )
+ {
+ /* Ask the source node to re-stream this file. */
+ streamStatus_.setAction(StreamContextManager.StreamCompletionAction.STREAM);
+ handleStreamCompletion(remoteHost);
+ /* Delete the orphaned file. */
+ File file = new File(streamContext_.getTargetFile());
+ file.delete();
+ throw ex;
+ }
+ if ( bytesRead_ == streamContext_.getExpectedBytes() )
+ {
+ logger_.debug("Removing stream context " + streamContext_);
+ handleStreamCompletion(remoteHost);
+ bytesRead_ = 0L;
+ fc_.close();
+ morphState();
+ }
+ }
+
+ return new byte[0];
+ }
+
+ private void handleStreamCompletion(String remoteHost) throws IOException
+ {
+ /*
+ * Streaming is complete. If all the data that has to be received inform the sender via
+ * the stream completion callback so that the source may perform the requisite cleanup.
+ */
+ IStreamComplete streamComplete = StreamContextManager.getStreamCompletionHandler(remoteHost);
+ if ( streamComplete != null )
+ {
+ streamComplete.onStreamCompletion(remoteHost, streamContext_, streamStatus_);
+ }
+ }
+
+ public void morphState() throws IOException
+ {
+ /* We instantiate an array of size 1 so that we can exit the event loop of the read. */
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.DONE);
+ if ( nextState == null )
+ {
+ nextState = new DoneState(stream_, ContentStreamState.bytes_);
+ stream_.putSocketState( TcpReader.TcpReaderState.DONE, nextState );
+ }
+ else
+ {
+ nextState.setContextData(ContentStreamState.bytes_);
+ }
+ stream_.morphState( nextState );
+ }
+
+ public void setContextData(Object data)
+ {
+ throw new UnsupportedOperationException("This method is not supported in the ContentStreamState");
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/DoneState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/DoneState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/DoneState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/DoneState.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class DoneState extends StartState
+{
+ private byte[] bytes_ = new byte[0];
+
+ DoneState(TcpReader stream, byte[] bytes)
+ {
+ super(stream);
+ bytes_ = bytes;
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ morphState();
+ return bytes_;
+ }
+
+ public void morphState() throws IOException
+ {
+ stream_.morphState(null);
+ }
+
+ public void setContextData(Object data)
+ {
+ bytes_ = (byte[])data;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/FastSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/FastSerializer.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/FastSerializer.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/FastSerializer.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.net.Message;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class FastSerializer implements ISerializer
+{
+ public byte[] serialize(Message message) throws IOException
+ {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ Message.serializer().serialize(message, buffer);
+ return buffer.getData();
+ }
+
+ public Message deserialize(byte[] bytes) throws IOException
+ {
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(bytes, bytes.length);
+ return Message.serializer().deserialize(bufIn);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ISerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ISerializer.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ISerializer.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ISerializer.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.net.Message;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface ISerializer
+{
+ public byte[] serialize(Message message) throws IOException;
+ public Message deserialize(byte[] bytes) throws IOException;
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/IStreamComplete.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/IStreamComplete.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/IStreamComplete.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/IStreamComplete.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IStreamComplete
+{
+ /*
+ * This callback if registered with the StreamContextManager is
+ * called when the stream from a host is completely handled.
+ */
+ public void onStreamCompletion(String from, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException;
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ProtocolHeaderState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ProtocolHeaderState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ProtocolHeaderState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ProtocolHeaderState.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import org.apache.cassandra.utils.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ProtocolHeaderState extends StartState
+{
+ private ByteBuffer buffer_;
+
+ public ProtocolHeaderState(TcpReader stream)
+ {
+ super(stream);
+ buffer_ = ByteBuffer.allocate(4);
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ return doRead(buffer_);
+ }
+
+ public void morphState() throws IOException
+ {
+ byte[] protocolHeader = buffer_.array();
+ int pH = MessagingService.byteArrayToInt(protocolHeader);
+
+ int type = MessagingService.getBits(pH, 1, 2);
+ stream_.getProtocolHeader().serializerType_ = type;
+
+ int stream = MessagingService.getBits(pH, 3, 1);
+ stream_.getProtocolHeader().isStreamingMode_ = (stream == 1) ? true : false;
+
+ if ( stream_.getProtocolHeader().isStreamingMode_ )
+ MessagingService.setStreamingMode(true);
+
+ int listening = MessagingService.getBits(pH, 4, 1);
+ stream_.getProtocolHeader().isListening_ = (listening == 1) ? true : false;
+
+ int version = MessagingService.getBits(pH, 15, 8);
+ stream_.getProtocolHeader().version_ = version;
+
+ if ( version <= MessagingService.getVersion() )
+ {
+ if ( stream_.getProtocolHeader().isStreamingMode_ )
+ {
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT_STREAM);
+ if ( nextState == null )
+ {
+ nextState = new ContentStreamState(stream_);
+ stream_.putSocketState( TcpReader.TcpReaderState.CONTENT_STREAM, nextState );
+ }
+ stream_.morphState( nextState );
+ buffer_.clear();
+ }
+ else
+ {
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT_LENGTH);
+ if ( nextState == null )
+ {
+ nextState = new ContentLengthState(stream_);
+ stream_.putSocketState( TcpReader.TcpReaderState.CONTENT_LENGTH, nextState );
+ }
+ stream_.morphState( nextState );
+ buffer_.clear();
+ }
+ }
+ else
+ {
+ throw new IOException("Invalid version in message. Scram.");
+ }
+ }
+
+ public void setContextData(Object data)
+ {
+ throw new UnsupportedOperationException("This method is not supported in the ProtocolHeaderState");
+ }
+}
+
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ProtocolState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ProtocolState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ProtocolState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ProtocolState.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import org.apache.cassandra.utils.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ProtocolState extends StartState
+{
+ private ByteBuffer buffer_;
+
+ public ProtocolState(TcpReader stream)
+ {
+ super(stream);
+ buffer_ = ByteBuffer.allocate(16);
+ }
+
+ public byte[] read() throws IOException, ReadNotCompleteException
+ {
+ return doRead(buffer_);
+ }
+
+ public void morphState() throws IOException
+ {
+ byte[] protocol = buffer_.array();
+ if ( MessagingService.isProtocolValid(protocol) )
+ {
+ StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.PROTOCOL);
+ if ( nextState == null )
+ {
+ nextState = new ProtocolHeaderState(stream_);
+ stream_.putSocketState( TcpReader.TcpReaderState.PROTOCOL, nextState );
+ }
+ stream_.morphState( nextState );
+ buffer_.clear();
+ }
+ else
+ {
+ throw new IOException("Invalid protocol header. The preamble seems to be messed up.");
+ }
+ }
+
+ public void setContextData(Object data)
+ {
+ throw new UnsupportedOperationException("This method is not supported in the ProtocolState");
+ }
+}
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ReadNotCompleteException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ReadNotCompleteException.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ReadNotCompleteException.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/ReadNotCompleteException.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: lakshman
+ * Date: Aug 22, 2005
+ * Time: 11:37:31 AM
+ * To change this template use File | Settings | File Templates.
+ */
+public class ReadNotCompleteException extends Exception
+{
+ ReadNotCompleteException(String message)
+ {
+ super(message);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/SerializerAttribute.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/SerializerAttribute.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/SerializerAttribute.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/SerializerAttribute.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.lang.annotation.*;
+
+@Retention(RetentionPolicy.RUNTIME)
+public @interface SerializerAttribute
+{
+ SerializerType value();
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/SerializerType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/SerializerType.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/SerializerType.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/SerializerType.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+public enum SerializerType
+{
+ BINARY,
+ JAVA,
+ XML,
+ JSON
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/StartState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/StartState.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/StartState.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/StartState.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.channels.SocketChannel;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public abstract class StartState
+{
+ protected TcpReader stream_;
+
+ public StartState(TcpReader stream)
+ {
+ stream_ = stream;
+ }
+
+ public abstract byte[] read() throws IOException, ReadNotCompleteException;
+ public abstract void morphState() throws IOException;
+ public abstract void setContextData(Object data);
+
+ protected byte[] doRead(ByteBuffer buffer) throws IOException, ReadNotCompleteException
+ {
+ SocketChannel socketChannel = stream_.getStream();
+ int bytesRead = socketChannel.read(buffer);
+ if ( bytesRead == -1 && buffer.remaining() > 0 )
+ {
+ throw new IOException("Reached an EOL or something bizzare occured. Reading from: " + socketChannel.socket().getInetAddress() + " BufferSizeRemaining: " + buffer.remaining());
+ }
+ if ( buffer.remaining() == 0 )
+ {
+ morphState();
+ }
+ else
+ {
+ throw new ReadNotCompleteException("Specified number of bytes have not been read from the Socket Channel");
+ }
+ return new byte[0];
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/StreamContextManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/StreamContextManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/StreamContextManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/StreamContextManager.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.BootstrapInitiateMessage;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class StreamContextManager
+{
+ private static Logger logger_ = Logger.getLogger(StreamContextManager.class);
+
+ public static enum StreamCompletionAction
+ {
+ DELETE,
+ STREAM
+ }
+
+ public static class StreamContext implements Serializable
+ {
+ private static Logger logger_ = Logger.getLogger(StreamContextManager.StreamContext.class);
+ private static ICompactSerializer<StreamContext> serializer_;
+
+ static
+ {
+ serializer_ = new StreamContextSerializer();
+ }
+
+ public static ICompactSerializer<StreamContext> serializer()
+ {
+ return serializer_;
+ }
+
+ private String targetFile_;
+ private long expectedBytes_;
+
+ public StreamContext(String targetFile, long expectedBytes)
+ {
+ targetFile_ = targetFile;
+ expectedBytes_ = expectedBytes;
+ }
+
+ public String getTargetFile()
+ {
+ return targetFile_;
+ }
+
+ public void setTargetFile(String file)
+ {
+ targetFile_ = file;
+ }
+
+ public long getExpectedBytes()
+ {
+ return expectedBytes_;
+ }
+
+ public boolean equals(Object o)
+ {
+ if ( !(o instanceof StreamContext) )
+ return false;
+
+ StreamContext rhs = (StreamContext)o;
+ return targetFile_.equals(rhs.targetFile_);
+ }
+
+ public int hashCode()
+ {
+ return toString().hashCode();
+ }
+
+ public String toString()
+ {
+ return targetFile_ + ":" + expectedBytes_;
+ }
+ }
+
+ public static class StreamContextSerializer implements ICompactSerializer<StreamContext>
+ {
+ public void serialize(StreamContextManager.StreamContext sc, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(sc.targetFile_);
+ dos.writeLong(sc.expectedBytes_);
+ }
+
+ public StreamContextManager.StreamContext deserialize(DataInputStream dis) throws IOException
+ {
+ String targetFile = dis.readUTF();
+ long expectedBytes = dis.readLong();
+ return new StreamContext(targetFile, expectedBytes);
+ }
+ }
+
+ public static class StreamStatus
+ {
+ private static ICompactSerializer<StreamStatus> serializer_;
+
+ static
+ {
+ serializer_ = new StreamStatusSerializer();
+ }
+
+ public static ICompactSerializer<StreamStatus> serializer()
+ {
+ return serializer_;
+ }
+
+ private String file_;
+ private long expectedBytes_;
+ private StreamCompletionAction action_;
+
+ public StreamStatus(String file, long expectedBytes)
+ {
+ file_ = file;
+ expectedBytes_ = expectedBytes;
+ action_ = StreamContextManager.StreamCompletionAction.DELETE;
+ }
+
+ public String getFile()
+ {
+ return file_;
+ }
+
+ public long getExpectedBytes()
+ {
+ return expectedBytes_;
+ }
+
+ void setAction(StreamContextManager.StreamCompletionAction action)
+ {
+ action_ = action;
+ }
+
+ public StreamContextManager.StreamCompletionAction getAction()
+ {
+ return action_;
+ }
+ }
+
+ public static class StreamStatusSerializer implements ICompactSerializer<StreamStatus>
+ {
+ public void serialize(StreamStatus streamStatus, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(streamStatus.getFile());
+ dos.writeLong(streamStatus.getExpectedBytes());
+ dos.writeInt(streamStatus.getAction().ordinal());
+ }
+
+ public StreamStatus deserialize(DataInputStream dis) throws IOException
+ {
+ String targetFile = dis.readUTF();
+ long expectedBytes = dis.readLong();
+ StreamStatus streamStatus = new StreamStatus(targetFile, expectedBytes);
+
+ int ordinal = dis.readInt();
+ if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
+ {
+ streamStatus.setAction(StreamCompletionAction.DELETE);
+ }
+ else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
+ {
+ streamStatus.setAction(StreamCompletionAction.STREAM);
+ }
+
+ return streamStatus;
+ }
+ }
+
+ public static class StreamStatusMessage implements Serializable
+ {
+ private static ICompactSerializer<StreamStatusMessage> serializer_;
+
+ static
+ {
+ serializer_ = new StreamStatusMessageSerializer();
+ }
+
+ public static ICompactSerializer<StreamStatusMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ public static Message makeStreamStatusMessage(StreamStatusMessage streamStatusMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream( bos );
+ StreamStatusMessage.serializer().serialize(streamStatusMessage, dos);
+ return new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapTerminateVerbHandler_, new Object[]{bos.toByteArray()});
+ }
+
+ protected StreamContextManager.StreamStatus streamStatus_;
+
+ public StreamStatusMessage(StreamContextManager.StreamStatus streamStatus)
+ {
+ streamStatus_ = streamStatus;
+ }
+
+ public StreamContextManager.StreamStatus getStreamStatus()
+ {
+ return streamStatus_;
+ }
+ }
+
+ public static class StreamStatusMessageSerializer implements ICompactSerializer<StreamStatusMessage>
+ {
+ public void serialize(StreamStatusMessage streamStatusMessage, DataOutputStream dos) throws IOException
+ {
+ StreamStatus.serializer().serialize(streamStatusMessage.streamStatus_, dos);
+ }
+
+ public StreamStatusMessage deserialize(DataInputStream dis) throws IOException
+ {
+ StreamContextManager.StreamStatus streamStatus = StreamStatus.serializer().deserialize(dis);
+ return new StreamStatusMessage(streamStatus);
+ }
+ }
+
+ /* Maintain a stream context per host that is the source of the stream */
+ public static Map<String, List<StreamContext>> ctxBag_ = new Hashtable<String, List<StreamContext>>();
+ /* Maintain in this map the status of the streams that need to be sent back to the source */
+ public static Map<String, List<StreamStatus>> streamStatusBag_ = new Hashtable<String, List<StreamStatus>>();
+ /* Maintains a callback handler per endpoint to notify the app that a stream from a given endpoint has been handled */
+ public static Map<String, IStreamComplete> streamNotificationHandlers_ = new HashMap<String, IStreamComplete>();
+
+ public synchronized static StreamContext getStreamContext(String key)
+ {
+ List<StreamContext> context = ctxBag_.get(key);
+ if ( context == null )
+ throw new IllegalStateException("Streaming context has not been set.");
+ StreamContext streamContext = context.remove(0);
+ if ( context.isEmpty() )
+ ctxBag_.remove(key);
+ return streamContext;
+ }
+
+ public synchronized static StreamStatus getStreamStatus(String key)
+ {
+ List<StreamStatus> status = streamStatusBag_.get(key);
+ if ( status == null )
+ throw new IllegalStateException("Streaming status has not been set.");
+ StreamStatus streamStatus = status.remove(0);
+ if ( status.isEmpty() )
+ streamStatusBag_.remove(key);
+ return streamStatus;
+ }
+
+ /*
+ * This method helps determine if the StreamCompletionHandler needs
+ * to be invoked for the data being streamed from a source.
+ */
+ public synchronized static boolean isDone(String key)
+ {
+ return (ctxBag_.get(key) == null);
+ }
+
+ public synchronized static IStreamComplete getStreamCompletionHandler(String key)
+ {
+ return streamNotificationHandlers_.get(key);
+ }
+
+ public synchronized static void removeStreamCompletionHandler(String key)
+ {
+ streamNotificationHandlers_.remove(key);
+ }
+
+ public synchronized static void registerStreamCompletionHandler(String key, IStreamComplete streamComplete)
+ {
+ streamNotificationHandlers_.put(key, streamComplete);
+ }
+
+ public synchronized static void addStreamContext(String key, StreamContext streamContext, StreamStatus streamStatus)
+ {
+ /* Record the stream context */
+ List<StreamContext> context = ctxBag_.get(key);
+ if ( context == null )
+ {
+ context = new ArrayList<StreamContext>();
+ ctxBag_.put(key, context);
+ }
+ context.add(streamContext);
+
+ /* Record the stream status for this stream context */
+ List<StreamStatus> status = streamStatusBag_.get(key);
+ if ( status == null )
+ {
+ status = new ArrayList<StreamStatus>();
+ streamStatusBag_.put(key, status);
+ }
+ status.add( streamStatus );
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/io/TcpReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/io/TcpReader.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/io/TcpReader.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/io/TcpReader.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.*;
+
+import org.apache.cassandra.net.ProtocolHeader;
+import org.apache.cassandra.net.TcpConnection;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TcpReader
+{
+ public static enum TcpReaderState
+ {
+ START,
+ PREAMBLE,
+ PROTOCOL,
+ CONTENT_LENGTH,
+ CONTENT,
+ CONTENT_STREAM,
+ DONE
+ }
+
+ private Map<TcpReaderState, StartState> stateMap_ = new HashMap<TcpReaderState, StartState>();
+ private TcpConnection connection_;
+ private StartState socketState_;
+ private ProtocolHeader protocolHeader_;
+
+ public TcpReader(TcpConnection connection)
+ {
+ connection_ = connection;
+ }
+
+ public StartState getSocketState(TcpReaderState state)
+ {
+ return stateMap_.get(state);
+ }
+
+ public void putSocketState(TcpReaderState state, StartState socketState)
+ {
+ stateMap_.put(state, socketState);
+ }
+
+ public void resetState()
+ {
+ StartState nextState = stateMap_.get(TcpReaderState.PREAMBLE);
+ if ( nextState == null )
+ {
+ nextState = new ProtocolState(this);
+ stateMap_.put(TcpReaderState.PREAMBLE, nextState);
+ }
+ socketState_ = nextState;
+ }
+
+ public void morphState(StartState state)
+ {
+ socketState_ = state;
+ if ( protocolHeader_ == null )
+ protocolHeader_ = new ProtocolHeader();
+ }
+
+ public ProtocolHeader getProtocolHeader()
+ {
+ return protocolHeader_;
+ }
+
+ public SocketChannel getStream()
+ {
+ return connection_.getSocketChannel();
+ }
+
+ public byte[] read() throws IOException
+ {
+ byte[] bytes = new byte[0];
+ while ( socketState_ != null )
+ {
+ try
+ {
+ bytes = socketState_.read();
+ }
+ catch ( ReadNotCompleteException e )
+ {
+ break;
+ }
+ }
+ return bytes;
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ Map<TcpReaderState, StartState> stateMap = new HashMap<TcpReaderState, StartState>();
+ stateMap.put(TcpReaderState.CONTENT, new ContentState(null, 10));
+ stateMap.put(TcpReaderState.START, new ProtocolState(null));
+ stateMap.put(TcpReaderState.CONTENT_LENGTH, new ContentLengthState(null));
+
+ StartState state = stateMap.get(TcpReaderState.CONTENT);
+ System.out.println( state.getClass().getName() );
+ state = stateMap.get(TcpReaderState.CONTENT_LENGTH);
+ System.out.println( state.getClass().getName() );
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/sink/IMessageSink.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/sink/IMessageSink.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/sink/IMessageSink.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/sink/IMessageSink.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.sink;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IMessageSink
+{
+ public Message handleMessage(Message message);
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/sink/SinkManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/sink/SinkManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/sink/SinkManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/sink/SinkManager.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.sink;
+
+import java.util.*;
+import java.io.IOException;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SinkManager
+{
+ private static LinkedList<IMessageSink> messageSinks_ = new LinkedList<IMessageSink>();
+
+ public static boolean isInitialized()
+ {
+ return ( messageSinks_.size() > 0 );
+ }
+
+ public static void addMessageSink(IMessageSink ms)
+ {
+ messageSinks_.addLast(ms);
+ }
+
+ public static void clearSinks(){
+ messageSinks_.clear();
+ }
+
+ public static Message processClientMessageSink(Message message)
+ {
+ ListIterator<IMessageSink> li = messageSinks_.listIterator();
+ while ( li.hasNext() )
+ {
+ IMessageSink ms = li.next();
+ message = ms.handleMessage(message);
+ if ( message == null )
+ {
+ return null;
+ }
+ }
+ return message;
+ }
+
+ public static Message processServerMessageSink(Message message)
+ {
+ ListIterator<IMessageSink> li = messageSinks_.listIterator(messageSinks_.size());
+ while ( li.hasPrevious() )
+ {
+ IMessageSink ms = li.previous();
+ message = ms.handleMessage(message);
+ if ( message == null )
+ {
+ return null;
+ }
+ }
+ return message;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/procedures/GroovyScriptRunner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/procedures/GroovyScriptRunner.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/procedures/GroovyScriptRunner.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/procedures/GroovyScriptRunner.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,13 @@
+package org.apache.cassandra.procedures;
+
+import groovy.lang.GroovyShell;
+
+public class GroovyScriptRunner
+{
+ private static GroovyShell groovyShell_ = new GroovyShell();
+
+ public static String evaluateString(String script)
+ {
+ return groovyShell_.evaluate(script).toString();
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/service/BootstrapAndLbHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/BootstrapAndLbHelper.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/BootstrapAndLbHelper.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/BootstrapAndLbHelper.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,64 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.math.BigInteger;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.cassandra.io.SSTable;
+import org.apache.log4j.Logger;
+
+public final class BootstrapAndLbHelper
+{
+ private static final Logger logger_ = Logger.getLogger(BootstrapAndLbHelper.class);
+ private static List<String> getIndexedPrimaryKeys()
+ {
+ List<String> indexedPrimaryKeys = SSTable.getIndexedKeys();
+ Iterator<String> it = indexedPrimaryKeys.iterator();
+
+ while ( it.hasNext() )
+ {
+ String key = it.next();
+ if ( !StorageService.instance().isPrimary(key) )
+ {
+ it.remove();
+ }
+ }
+ return indexedPrimaryKeys;
+ }
+
+ /**
+ * Given the number of keys that need to be transferred say, 1000
+ * and given the smallest key stored we need the hash of the 1000th
+ * key greater than the smallest key in the sorted order in the primary
+ * range.
+ *
+ * @param keyCount number of keys after which token is required.
+ * @return token.
+ */
+ public static BigInteger getTokenBasedOnPrimaryCount(int keyCount)
+ {
+ List<String> indexedPrimaryKeys = getIndexedPrimaryKeys();
+ int index = keyCount / SSTable.indexInterval();
+ String key = (index >= indexedPrimaryKeys.size()) ? indexedPrimaryKeys.get( indexedPrimaryKeys.size() - 1 ) : indexedPrimaryKeys.get(index);
+ logger_.debug("Hashing key " + key + " ...");
+ return StorageService.instance().hash(key);
+ }
+}