You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/02 07:13:17 UTC

svn commit: r749207 [5/12] - in /incubator/cassandra/src/org/apache/cassandra: loader/ locator/ net/ net/http/ net/io/ net/sink/ procedures/ service/ test/ tools/

Added: incubator/cassandra/src/org/apache/cassandra/net/io/FastSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/FastSerializer.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/FastSerializer.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/FastSerializer.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.net.Message;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class FastSerializer implements ISerializer
+{ 
+    public byte[] serialize(Message message) throws IOException
+    {
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        Message.serializer().serialize(message, buffer);
+        return buffer.getData();
+    }
+    
+    public Message deserialize(byte[] bytes) throws IOException
+    {
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(bytes, bytes.length);
+        return Message.serializer().deserialize(bufIn);
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/io/ISerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/ISerializer.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/ISerializer.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/ISerializer.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.net.Message;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface ISerializer
+{
+    public byte[] serialize(Message message) throws IOException;
+    public Message deserialize(byte[] bytes) throws IOException;
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/io/IStreamComplete.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/IStreamComplete.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/IStreamComplete.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/IStreamComplete.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IStreamComplete
+{
+    /*
+     * This callback if registered with the StreamContextManager is 
+     * called when the stream from a host is completely handled. 
+    */
+    public void onStreamCompletion(String from, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException;
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolHeaderState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolHeaderState.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolHeaderState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolHeaderState.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import org.apache.cassandra.utils.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ProtocolHeaderState extends StartState
+{
+    private ByteBuffer buffer_;
+
+    public ProtocolHeaderState(TcpReader stream)
+    {
+        super(stream);
+        buffer_ = ByteBuffer.allocate(4);
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {        
+        return doRead(buffer_);
+    }
+
+    public void morphState() throws IOException
+    {
+        byte[] protocolHeader = buffer_.array();
+        int pH = MessagingService.byteArrayToInt(protocolHeader);
+        
+        int type = MessagingService.getBits(pH, 1, 2);
+        stream_.getProtocolHeader().serializerType_ = type;
+        
+        int stream = MessagingService.getBits(pH, 3, 1);
+        stream_.getProtocolHeader().isStreamingMode_ = (stream == 1) ? true : false;
+        
+        if ( stream_.getProtocolHeader().isStreamingMode_ )
+            MessagingService.setStreamingMode(true);
+        
+        int listening = MessagingService.getBits(pH, 4, 1);
+        stream_.getProtocolHeader().isListening_ = (listening == 1) ? true : false;
+        
+        int version = MessagingService.getBits(pH, 15, 8);
+        stream_.getProtocolHeader().version_ = version;
+        
+        if ( version <= MessagingService.getVersion() )
+        {
+            if ( stream_.getProtocolHeader().isStreamingMode_ )
+            { 
+                StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT_STREAM);
+                if ( nextState == null )
+                {
+                    nextState = new ContentStreamState(stream_);
+                    stream_.putSocketState( TcpReader.TcpReaderState.CONTENT_STREAM, nextState );
+                }
+                stream_.morphState( nextState );
+                buffer_.clear();
+            }
+            else
+            {
+                StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT_LENGTH);
+                if ( nextState == null )
+                {
+                    nextState = new ContentLengthState(stream_);
+                    stream_.putSocketState( TcpReader.TcpReaderState.CONTENT_LENGTH, nextState );
+                }                
+                stream_.morphState( nextState );   
+                buffer_.clear();
+            }            
+        }
+        else
+        {
+            throw new IOException("Invalid version in message. Scram.");
+        }
+    }
+    
+    public void setContextData(Object data)
+    {
+        throw new UnsupportedOperationException("This method is not supported in the ProtocolHeaderState");
+    }
+}
+
+

Added: incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolState.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/ProtocolState.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import org.apache.cassandra.utils.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ProtocolState extends StartState
+{
+    private ByteBuffer buffer_;
+
+    public ProtocolState(TcpReader stream)
+    {
+        super(stream);
+        buffer_ = ByteBuffer.allocate(16);
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {        
+        return doRead(buffer_);
+    }
+
+    public void morphState() throws IOException
+    {
+        byte[] protocol = buffer_.array();
+        if ( MessagingService.isProtocolValid(protocol) )
+        {            
+            StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.PROTOCOL);
+            if ( nextState == null )
+            {
+                nextState = new ProtocolHeaderState(stream_);
+                stream_.putSocketState( TcpReader.TcpReaderState.PROTOCOL, nextState );
+            }
+            stream_.morphState( nextState ); 
+            buffer_.clear();
+        }
+        else
+        {
+            throw new IOException("Invalid protocol header. The preamble seems to be messed up.");
+        }
+    }
+    
+    public void setContextData(Object data)
+    {
+        throw new UnsupportedOperationException("This method is not supported in the ProtocolState");
+    }
+}
+

Added: incubator/cassandra/src/org/apache/cassandra/net/io/ReadNotCompleteException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/ReadNotCompleteException.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/ReadNotCompleteException.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/ReadNotCompleteException.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: lakshman
+ * Date: Aug 22, 2005
+ * Time: 11:37:31 AM
+ * To change this template use File | Settings | File Templates.
+ */
+public class ReadNotCompleteException extends Exception
+{
+    ReadNotCompleteException(String message)
+    {
+        super(message);
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/io/SerializerAttribute.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/SerializerAttribute.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/SerializerAttribute.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/SerializerAttribute.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.lang.annotation.*;
+
+@Retention(RetentionPolicy.RUNTIME)
+public @interface SerializerAttribute
+{
+    SerializerType value();
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/io/SerializerType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/SerializerType.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/SerializerType.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/SerializerType.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+public enum SerializerType
+{
+    BINARY,
+    JAVA,
+    XML,
+    JSON
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/io/StartState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/StartState.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/StartState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/StartState.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.channels.SocketChannel;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public abstract class StartState
+{
+    protected TcpReader stream_;
+
+    public StartState(TcpReader stream)
+    {
+        stream_ = stream;
+    }
+
+    public abstract byte[] read() throws IOException, ReadNotCompleteException;
+    public abstract void morphState() throws IOException;
+    public abstract void setContextData(Object data);
+
+    protected byte[] doRead(ByteBuffer buffer) throws IOException, ReadNotCompleteException
+    {        
+        SocketChannel socketChannel = stream_.getStream();
+        int bytesRead = socketChannel.read(buffer);     
+        if ( bytesRead == -1 && buffer.remaining() > 0 )
+        {            
+            throw new IOException("Reached an EOL or something bizzare occured. Reading from: " + socketChannel.socket().getInetAddress() + " BufferSizeRemaining: " + buffer.remaining());
+        }
+        if ( buffer.remaining() == 0 )
+        {
+            morphState();
+        }
+        else
+        {            
+            throw new ReadNotCompleteException("Specified number of bytes have not been read from the Socket Channel");
+        }
+        return new byte[0];
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/io/StreamContextManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/StreamContextManager.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/StreamContextManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/StreamContextManager.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import javax.xml.bind.annotation.XmlElement;
+
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.dht.BootstrapInitiateMessage;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class StreamContextManager
+{
+    private static Logger logger_ = Logger.getLogger(StreamContextManager.class);
+    
+    public static enum StreamCompletionAction
+    {
+        DELETE,
+        STREAM
+    }
+    
+    public static class StreamContext implements Serializable
+    {
+        private static Logger logger_ = Logger.getLogger(StreamContextManager.StreamContext.class);
+        private static ICompactSerializer<StreamContext> serializer_;
+        
+        static
+        {
+            serializer_ = new StreamContextSerializer();
+        }
+        
+        public static ICompactSerializer<StreamContext> serializer()
+        {
+            return serializer_;
+        }
+                
+        private String targetFile_;        
+        private long expectedBytes_;                     
+        
+        public StreamContext(String targetFile, long expectedBytes)
+        {
+            targetFile_ = targetFile;
+            expectedBytes_ = expectedBytes;         
+        }                
+                
+        public String getTargetFile()
+        {
+            return targetFile_;
+        }
+        
+        public void setTargetFile(String file)
+        {
+            targetFile_ = file;
+        }
+        
+        public long getExpectedBytes()
+        {
+            return expectedBytes_;
+        }
+                
+        public boolean equals(Object o)
+        {
+            if ( !(o instanceof StreamContext) )
+                return false;
+            
+            StreamContext rhs = (StreamContext)o;
+            return targetFile_.equals(rhs.targetFile_);
+        }
+        
+        public int hashCode()
+        {
+            return toString().hashCode();
+        }
+        
+        public String toString()
+        {
+            return targetFile_ + ":" + expectedBytes_;
+        }
+    }
+    
+    public static class StreamContextSerializer implements ICompactSerializer<StreamContext>
+    {
+        public void serialize(StreamContextManager.StreamContext sc, DataOutputStream dos) throws IOException
+        {
+            dos.writeUTF(sc.targetFile_);
+            dos.writeLong(sc.expectedBytes_);            
+        }
+        
+        public StreamContextManager.StreamContext deserialize(DataInputStream dis) throws IOException
+        {
+            String targetFile = dis.readUTF();
+            long expectedBytes = dis.readLong();           
+            return new StreamContext(targetFile, expectedBytes);
+        }
+    }
+    
+    public static class StreamStatus
+    {
+        private static ICompactSerializer<StreamStatus> serializer_;
+        
+        static 
+        {
+            serializer_ = new StreamStatusSerializer();
+        }
+        
+        public static ICompactSerializer<StreamStatus> serializer()
+        {
+            return serializer_;
+        }
+            
+        private String file_;               
+        private long expectedBytes_;                
+        private StreamCompletionAction action_;
+                
+        public StreamStatus(String file, long expectedBytes)
+        {
+            file_ = file;
+            expectedBytes_ = expectedBytes;
+            action_ = StreamContextManager.StreamCompletionAction.DELETE;
+        }
+        
+        public String getFile()
+        {
+            return file_;
+        }
+        
+        public long getExpectedBytes()
+        {
+            return expectedBytes_;
+        }
+        
+        void setAction(StreamContextManager.StreamCompletionAction action)
+        {
+            action_ = action;
+        }
+        
+        public StreamContextManager.StreamCompletionAction getAction()
+        {
+            return action_;
+        }
+    }
+    
+    public static class StreamStatusSerializer implements ICompactSerializer<StreamStatus>
+    {
+        public void serialize(StreamStatus streamStatus, DataOutputStream dos) throws IOException
+        {
+            dos.writeUTF(streamStatus.getFile());
+            dos.writeLong(streamStatus.getExpectedBytes());
+            dos.writeInt(streamStatus.getAction().ordinal());
+        }
+        
+        public StreamStatus deserialize(DataInputStream dis) throws IOException
+        {
+            String targetFile = dis.readUTF();
+            long expectedBytes = dis.readLong();
+            StreamStatus streamStatus = new StreamStatus(targetFile, expectedBytes);
+            
+            int ordinal = dis.readInt();                        
+            if ( ordinal == StreamCompletionAction.DELETE.ordinal() )
+            {
+                streamStatus.setAction(StreamCompletionAction.DELETE);
+            }
+            else if ( ordinal == StreamCompletionAction.STREAM.ordinal() )
+            {
+                streamStatus.setAction(StreamCompletionAction.STREAM);
+            }
+            
+            return streamStatus;
+        }
+    }
+    
+    public static class StreamStatusMessage implements Serializable
+    {
+        private static ICompactSerializer<StreamStatusMessage> serializer_;
+        
+        static 
+        {
+            serializer_ = new StreamStatusMessageSerializer();
+        }
+        
+        public static ICompactSerializer<StreamStatusMessage> serializer()
+        {
+            return serializer_;
+        }
+        
+        public static Message makeStreamStatusMessage(StreamStatusMessage streamStatusMessage) throws IOException
+        {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream( bos );
+            StreamStatusMessage.serializer().serialize(streamStatusMessage, dos);
+            return new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapTerminateVerbHandler_, new Object[]{bos.toByteArray()});
+        }
+        
+        protected StreamContextManager.StreamStatus streamStatus_;
+        
+        public StreamStatusMessage(StreamContextManager.StreamStatus streamStatus)
+        {
+            streamStatus_ = streamStatus;
+        }
+        
+        public StreamContextManager.StreamStatus getStreamStatus()
+        {
+            return streamStatus_;
+        }
+    }
+    
+    public static class StreamStatusMessageSerializer implements ICompactSerializer<StreamStatusMessage>
+    {
+        public void serialize(StreamStatusMessage streamStatusMessage, DataOutputStream dos) throws IOException
+        {
+            StreamStatus.serializer().serialize(streamStatusMessage.streamStatus_, dos);            
+        }
+        
+        public StreamStatusMessage deserialize(DataInputStream dis) throws IOException
+        {            
+            StreamContextManager.StreamStatus streamStatus = StreamStatus.serializer().deserialize(dis);         
+            return new StreamStatusMessage(streamStatus);
+        }
+    }
+        
+    /* Maintain a stream context per host that is the source of the stream */
+    public static Map<String, List<StreamContext>> ctxBag_ = new Hashtable<String, List<StreamContext>>();  
+    /* Maintain in this map the status of the streams that need to be sent back to the source */
+    public static Map<String, List<StreamStatus>> streamStatusBag_ = new Hashtable<String, List<StreamStatus>>();
+    /* Maintains a callback handler per endpoint to notify the app that a stream from a given endpoint has been handled */
+    public static Map<String, IStreamComplete> streamNotificationHandlers_ = new HashMap<String, IStreamComplete>();
+    
+    public synchronized static StreamContext getStreamContext(String key)
+    {        
+        List<StreamContext> context = ctxBag_.get(key);
+        if ( context == null )
+            throw new IllegalStateException("Streaming context has not been set.");
+        StreamContext streamContext = context.remove(0);        
+        if ( context.isEmpty() )
+            ctxBag_.remove(key);
+        return streamContext;
+    }
+    
+    public synchronized static StreamStatus getStreamStatus(String key)
+    {
+        List<StreamStatus> status = streamStatusBag_.get(key);
+        if ( status == null )
+            throw new IllegalStateException("Streaming status has not been set.");
+        StreamStatus streamStatus = status.remove(0);        
+        if ( status.isEmpty() )
+            streamStatusBag_.remove(key);
+        return streamStatus;
+    }
+    
+    /*
+     * This method helps determine if the StreamCompletionHandler needs
+     * to be invoked for the data being streamed from a source. 
+    */
+    public synchronized static boolean isDone(String key)
+    {
+        return (ctxBag_.get(key) == null);
+    }
+    
+    public synchronized static IStreamComplete getStreamCompletionHandler(String key)
+    {
+        return streamNotificationHandlers_.get(key);
+    }
+    
+    public synchronized static void removeStreamCompletionHandler(String key)
+    {
+        streamNotificationHandlers_.remove(key);
+    }
+    
+    public synchronized static void registerStreamCompletionHandler(String key, IStreamComplete streamComplete)
+    {
+        streamNotificationHandlers_.put(key, streamComplete);
+    }
+    
+    public synchronized static void addStreamContext(String key, StreamContext streamContext, StreamStatus streamStatus)
+    {
+        /* Record the stream context */
+        List<StreamContext> context = ctxBag_.get(key);        
+        if ( context == null )
+        {
+            context = new ArrayList<StreamContext>();
+            ctxBag_.put(key, context);
+        }
+        context.add(streamContext);
+        
+        /* Record the stream status for this stream context */
+        List<StreamStatus> status = streamStatusBag_.get(key);
+        if ( status == null )
+        {
+            status = new ArrayList<StreamStatus>();
+            streamStatusBag_.put(key, status);
+        }
+        status.add( streamStatus );
+    }        
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/io/TcpReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/io/TcpReader.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/io/TcpReader.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/io/TcpReader.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.*;
+
+import org.apache.cassandra.net.ProtocolHeader;
+import org.apache.cassandra.net.TcpConnection;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TcpReader
+{
+    public static enum TcpReaderState
+    {
+        START,
+        PREAMBLE,
+        PROTOCOL,
+        CONTENT_LENGTH,
+        CONTENT,
+        CONTENT_STREAM,
+        DONE
+    }
+    
+    private Map<TcpReaderState, StartState> stateMap_ = new HashMap<TcpReaderState, StartState>();
+    private TcpConnection connection_;
+    private StartState socketState_;
+    private ProtocolHeader protocolHeader_;
+    
+    public TcpReader(TcpConnection connection)
+    {
+        connection_ = connection;        
+    }
+    
+    public StartState getSocketState(TcpReaderState state)
+    {
+        return stateMap_.get(state);
+    }
+    
+    public void putSocketState(TcpReaderState state, StartState socketState)
+    {
+        stateMap_.put(state, socketState);
+    } 
+    
+    public void resetState()
+    {
+        StartState nextState = stateMap_.get(TcpReaderState.PREAMBLE);
+        if ( nextState == null )
+        {
+            nextState = new ProtocolState(this);
+            stateMap_.put(TcpReaderState.PREAMBLE, nextState);
+        }
+        socketState_ = nextState;
+    }
+    
+    public void morphState(StartState state)
+    {        
+        socketState_ = state;
+        if ( protocolHeader_ == null )
+            protocolHeader_ = new ProtocolHeader();
+    }
+    
+    public ProtocolHeader getProtocolHeader()
+    {
+        return protocolHeader_;
+    }
+    
+    public SocketChannel getStream()
+    {
+        return connection_.getSocketChannel();
+    }
+    
+    public byte[] read() throws IOException
+    {
+        byte[] bytes = new byte[0];      
+        while ( socketState_ != null )
+        {
+            try
+            {                                                                      
+                bytes = socketState_.read();
+            }
+            catch ( ReadNotCompleteException e )
+            {                
+                break;
+            }
+        }
+        return bytes;
+    }    
+    
+    public static void main(String[] args) throws Throwable
+    {
+        Map<TcpReaderState, StartState> stateMap = new HashMap<TcpReaderState, StartState>();
+        stateMap.put(TcpReaderState.CONTENT, new ContentState(null, 10));
+        stateMap.put(TcpReaderState.START, new ProtocolState(null));
+        stateMap.put(TcpReaderState.CONTENT_LENGTH, new ContentLengthState(null));
+        
+        StartState state = stateMap.get(TcpReaderState.CONTENT);
+        System.out.println( state.getClass().getName() );
+        state = stateMap.get(TcpReaderState.CONTENT_LENGTH);
+        System.out.println( state.getClass().getName() );
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/sink/IMessageSink.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/sink/IMessageSink.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/sink/IMessageSink.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/sink/IMessageSink.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.sink;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IMessageSink
+{
+    public Message handleMessage(Message message);    
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/sink/SinkManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/sink/SinkManager.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/sink/SinkManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/sink/SinkManager.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.sink;
+
+import java.util.*;
+import java.io.IOException;
+
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SinkManager
+{
+    private static LinkedList<IMessageSink> messageSinks_ = new LinkedList<IMessageSink>();
+
+    public static boolean isInitialized()
+    {
+        return ( messageSinks_.size() > 0 );
+    }
+
+    public static void addMessageSink(IMessageSink ms)
+    {
+        messageSinks_.addLast(ms);
+    }
+    
+    public static void clearSinks(){
+        messageSinks_.clear();
+    }
+
+    public static Message processClientMessageSink(Message message)
+    {
+        ListIterator<IMessageSink> li = messageSinks_.listIterator();
+        while ( li.hasNext() )
+        {
+            IMessageSink ms = li.next();
+            message = ms.handleMessage(message);
+            if ( message == null )
+            {
+                return null;
+            }
+        }
+        return message;
+    }
+
+    public static Message processServerMessageSink(Message message)
+    {
+        ListIterator<IMessageSink> li = messageSinks_.listIterator(messageSinks_.size());
+        while ( li.hasPrevious() )
+        {
+            IMessageSink ms = li.previous();
+            message = ms.handleMessage(message);
+            if ( message == null )
+            {
+                return null;
+            }
+        }
+        return message;
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/procedures/GroovyScriptRunner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/procedures/GroovyScriptRunner.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/procedures/GroovyScriptRunner.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/procedures/GroovyScriptRunner.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,13 @@
+package org.apache.cassandra.procedures;
+
+import groovy.lang.GroovyShell;
+
+public class GroovyScriptRunner
+{
+	private static GroovyShell groovyShell_ = new GroovyShell();
+
+	public static String evaluateString(String script)
+	{        
+		 return groovyShell_.evaluate(script).toString();
+	}
+}

Added: incubator/cassandra/src/org/apache/cassandra/service/BootstrapAndLbHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/service/BootstrapAndLbHelper.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/service/BootstrapAndLbHelper.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/service/BootstrapAndLbHelper.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,64 @@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.math.BigInteger;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.cassandra.io.SSTable;
+import org.apache.log4j.Logger;
+
+public final class BootstrapAndLbHelper
+{
+    private static final Logger logger_ = Logger.getLogger(BootstrapAndLbHelper.class);
+    private static List<String> getIndexedPrimaryKeys()
+    {
+        List<String> indexedPrimaryKeys = SSTable.getIndexedKeys();
+        Iterator<String> it = indexedPrimaryKeys.iterator();
+        
+        while ( it.hasNext() )
+        {
+            String key = it.next();
+            if ( !StorageService.instance().isPrimary(key) )
+            {
+                it.remove();
+            }
+        }
+        return indexedPrimaryKeys;
+    }
+    
+    /**
+     * Given the number of keys that need to be transferred say, 1000
+     * and given the smallest key stored we need the hash of the 1000th
+     * key greater than the smallest key in the sorted order in the primary
+     * range.
+     * 
+     * @param keyCount number of keys after which token is required.
+     * @return token.
+    */
+    public static BigInteger getTokenBasedOnPrimaryCount(int keyCount)
+    {
+        List<String> indexedPrimaryKeys = getIndexedPrimaryKeys();
+        int index = keyCount / SSTable.indexInterval();
+        String key = (index >= indexedPrimaryKeys.size()) ? indexedPrimaryKeys.get( indexedPrimaryKeys.size() - 1 ) : indexedPrimaryKeys.get(index);
+        logger_.debug("Hashing key " + key + " ...");
+        return StorageService.instance().hash(key);
+    }
+}