You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC

svn commit: r749218 [23/34] - in /incubator/cassandra: branches/ dist/ nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/ trunk/src/org/apache/ trunk/src/org/apache/cassandra/ trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/Header.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/Header.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/Header.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.utils.GuidGenerator;
+
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Header implements java.io.Serializable
+{
+    static final long serialVersionUID = -3194851946523170022L;
+    private static ICompactSerializer<Header> serializer_;
+    private static AtomicInteger idGen_ = new AtomicInteger(0);
+    
+    static
+    {
+        serializer_ = new HeaderSerializer();        
+    }
+    
+    static ICompactSerializer<Header> serializer()
+    {
+        return serializer_;
+    }
+
+    private EndPoint from_;
+    private String type_;
+    private String verb_;
+    private String messageId_;
+    protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
+    
+    Header(String id, EndPoint from, String messageType, String verb)
+    {
+        messageId_ = id;
+        from_ = from;
+        type_ = messageType;
+        verb_ = verb;        
+    }
+    
+    Header(String id, EndPoint from, String messageType, String verb, Map<String, byte[]> details)
+    {
+        this(id, from, messageType, verb);
+        details_ = details;
+    }
+
+    Header(EndPoint from, String messageType, String verb)
+    {
+        messageId_ = Integer.toString(idGen_.incrementAndGet());
+        from_ = from;
+        type_ = messageType;
+        verb_ = verb;
+    }        
+
+    EndPoint getFrom()
+    {
+        return from_;
+    }
+
+    String getMessageType()
+    {
+        return type_;
+    }
+
+    String getVerb()
+    {
+        return verb_;
+    }
+
+    String getMessageId()
+    {
+        return messageId_;
+    }
+
+    void setMessageId(String id)
+    {
+        messageId_ = id;
+    }
+    
+    void setMessageType(String type)
+    {
+        type_ = type;
+    }
+    
+    void setMessageVerb(String verb)
+    {
+        verb_ = verb;
+    }
+    
+    byte[] getDetail(Object key)
+    {
+        return details_.get(key);
+    }
+    
+    void removeDetail(Object key)
+    {
+        details_.remove(key);
+    }
+    
+    void addDetail(String key, byte[] value)
+    {
+        details_.put(key, value);
+    }
+    
+    Map<String, byte[]> getDetails()
+    {
+        return details_;
+    }
+}
+
+class HeaderSerializer implements ICompactSerializer<Header>
+{
+    public void serialize(Header t, DataOutputStream dos) throws IOException
+    {           
+        dos.writeUTF(t.getMessageId());
+        CompactEndPointSerializationHelper.serialize(t.getFrom(), dos);
+        dos.writeUTF(t.getMessageType());
+        dos.writeUTF( t.getVerb() );
+        
+        /* Serialize the message header */
+        int size = t.details_.size();
+        dos.writeInt(size);
+        Set<String> keys = t.details_.keySet();
+        
+        for( String key : keys )
+        {
+            dos.writeUTF(key);
+            byte[] value = t.details_.get(key);
+            dos.writeInt(value.length);
+            dos.write(value);
+        }
+    }
+
+    public Header deserialize(DataInputStream dis) throws IOException
+    {
+        String id = dis.readUTF();
+        EndPoint from = CompactEndPointSerializationHelper.deserialize(dis);
+        String type = dis.readUTF();
+        String verb = dis.readUTF();
+        
+        /* Deserializing the message header */
+        int size = dis.readInt();
+        Map<String, byte[]> details = new Hashtable<String, byte[]>(size);
+        for ( int i = 0; i < size; ++i )
+        {
+            String key = dis.readUTF();
+            int length = dis.readInt();
+            byte[] bytes = new byte[length];
+            dis.readFully(bytes);
+            details.put(key, bytes);
+        }
+        
+        return new Header(id, from, type, verb, details);
+    }
+}
+
+

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/HeaderTypes.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/HeaderTypes.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/HeaderTypes.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/HeaderTypes.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class HeaderTypes 
+{
+    public final static String TASK_PROFILE_CHAIN = "TASK_PROFILE_CHAIN";
+    public static String TASK_ID = "TASK_ID";
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncCallback.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IAsyncCallback 
+{
+	/**
+	 * @param response responses to be returned
+	 */
+	public void response(Message msg);
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/IAsyncResult.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IAsyncResult
+{
+    public Object[] get();
+    public boolean isDone();
+    public Object[] get(long timeout, TimeUnit tu) throws TimeoutException; 
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/IMessagingService.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import javax.xml.bind.JAXBException;
+
+import org.apache.cassandra.concurrent.IStage;
+
+
+/**
+ * An IMessagingService provides the methods for sending messages to remote
+ * endpoints. IMessagingService enables the sending of request-response style
+ * messages and fire-forget style messages.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IMessagingService
+{   
+	/**
+     * Register a verb and the corresponding verb handler with the
+     * Messaging Service.
+     * @param type name of the verb.     
+     * @param verbHandler handler for the specified verb
+     */
+    public void registerVerbHandlers(String type, IVerbHandler verbHandler);
+    
+    /**
+     * Deregister all verbhandlers corresponding to localEndPoint.
+     * @param localEndPoint
+     */
+    public void deregisterAllVerbHandlers(EndPoint localEndPoint);
+    
+    /**
+     * Deregister a verbhandler corresponding to the verb from the
+     * Messaging Service.
+     * @param type name of the verb.      
+     */
+    public void deregisterVerbHandlers(String type);
+    
+    /**
+     * Listen on the specified port.
+     * @param ep EndPoint whose port to listen on.
+     * @param isHttp specify if the port is an Http port.     
+     */
+    public void listen(EndPoint ep, boolean isHttp) throws IOException;
+    
+    /**
+     * Listen on the specified port.
+     * @param ep EndPoint whose port to listen on.     
+     */
+    public void listenUDP(EndPoint ep);
+    
+    /**
+     * Send a message to a given endpoint. 
+     * @param message message to be sent.
+     * @param to endpoint to which the message needs to be sent
+     * @return an reference to an IAsyncResult which can be queried for the
+     * response
+     */
+    public IAsyncResult sendRR(Message message, EndPoint to);
+
+    /**
+     * Send a message to the given set of endpoints and informs the MessagingService
+     * to wait for at least <code>howManyResults</code> responses to determine success
+     * of failure.
+     * @param message message to be sent.
+     * @param to endpoints to which the message needs to be sent
+     * @param cb callback interface which is used to pass the responses
+     * @return an reference to message id used to match with the result
+     */
+    public String sendRR(Message message, EndPoint[] to, IAsyncCallback cb);
+    
+    /**
+     * Send a message to a given endpoint. This method specifies a callback
+     * which is invoked with the actual response.
+     * @param message message to be sent.
+     * @param to endpoint to which the message needs to be sent
+     * @param cb callback interface which is used to pass the responses or
+     *           suggest that a timeout occured to the invoker of the send().
+     *           suggest that a timeout occured to the invoker of the send().
+     * @return an reference to message id used to match with the result
+     */
+    public String sendRR(Message message, EndPoint to, IAsyncCallback cb);
+
+    /**
+     * Send a message to a given endpoint. The ith element in the <code>messages</code>
+     * array is sent to the ith element in the <code>to</code> array.This method assumes
+     * there is a one-one mapping between the <code>messages</code> array and
+     * the <code>to</code> array. Otherwise an  IllegalArgumentException will be thrown.
+     * This method also informs the MessagingService to wait for at least
+     * <code>howManyResults</code> responses to determine success of failure.
+     * @param messages messages to be sent.
+     * @param to endpoints to which the message needs to be sent
+     * @param cb callback interface which is used to pass the responses or
+     *           suggest that a timeout occured to the invoker of the send().
+     *           suggest that a timeout occured to the invoker of the send().
+     * @return an reference to message id used to match with the result
+     */
+    public String sendRR(Message[] messages, EndPoint[] to, IAsyncCallback cb);
+
+    /**
+     * Send a message to a given endpoint. This method adheres to the fire and forget
+     * style messaging.
+     * @param message messages to be sent.
+     * @param to endpoint to which the message needs to be sent
+     */
+    public void sendOneWay(Message message, EndPoint to);
+        
+    /**
+     * Send a message to a given endpoint. This method adheres to the fire and forget
+     * style messaging.
+     * @param message messages to be sent.
+     * @param to endpoint to which the message needs to be sent
+     */
+    public void sendUdpOneWay(Message message, EndPoint to);
+    
+    /**
+     * Stream a file from source to destination. This is highly optimized
+     * to not hold any of the contents of the file in memory.
+     * @param file name of file to stream.
+     * param start position inside the file
+     * param total number of bytes to stream
+     * param to endpoint to which we need to stream the file.
+    */
+    public void stream(String file, long startPosition, long total, EndPoint from, EndPoint to);
+
+    /**
+     * This method returns the verb handler associated with the registered
+     * verb. If no handler has been registered then null is returned.
+     * @param verb for which the verb handler is sought
+     * @return a reference to IVerbHandler which is the handler for the specified verb
+     */
+    public IVerbHandler getVerbHandler(String verb);    
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/IVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/IVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/IVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/IVerbHandler.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * IVerbHandler provides the method that all verb handlers need to implement.
+ * The concrete implementation of this interface would provide the functionality
+ * for a given verb.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IVerbHandler
+{
+    /**
+     * This method delivers a message to the implementing class (if the implementing
+     * class was registered by a call to MessagingService.registerVerbHandlers).
+     * Note that the caller should not be holding any locks when calling this method
+     * because the implementation may be synchronized.
+     * 
+     * @param message - incoming message that needs handling.     
+     */
+    public void doVerb(Message message);
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/Message.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.lang.reflect.Array;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Message implements java.io.Serializable
+{    
+    static final long serialVersionUID = 6329198792470413221L;
+    private static ICompactSerializer<Message> serializer_;
+    
+    static
+    {
+        serializer_ = new MessageSerializer();        
+    }
+    
+    public static ICompactSerializer<Message> serializer()
+    {
+        return serializer_;
+    }
+    
+    Header header_;
+    private Object[] body_ = new Object[0];
+    
+    /* Ctor for JAXB. DO NOT DELETE */
+    private Message()
+    {
+    }
+
+    protected Message(String id, EndPoint from, String messageType, String verb, Object[] body)
+    {
+        header_ = new Header(id, from, messageType, verb);
+        body_ = body;
+    }
+    
+    protected Message(Header header, Object[] body)
+    {
+        header_ = header;
+        body_ = body;
+    }
+
+    public Message(EndPoint from, String messageType, String verb, Object[] body)
+    {
+        header_ = new Header(from, messageType, verb);
+        body_ = body;
+    }    
+    
+    public byte[] getHeader(Object key)
+    {
+        return header_.getDetail(key);
+    }
+    
+    public void removeHeader(Object key)
+    {
+        header_.removeDetail(key);
+    }
+    
+    public void setMessageType(String type)
+    {
+        header_.setMessageType(type);
+    }
+
+    public void setMessageVerb(String verb)
+    {
+        header_.setMessageVerb(verb);
+    }
+
+    public void addHeader(String key, byte[] value)
+    {
+        header_.addDetail(key, value);
+    }
+    
+    public Map<String, byte[]> getHeaders()
+    {
+        return header_.getDetails();
+    }
+
+    public Object[] getMessageBody()
+    {
+        return body_;
+    }
+    
+    public void setMessageBody(Object[] body)
+    {
+        body_ = body;
+    }
+
+    public EndPoint getFrom()
+    {
+        return header_.getFrom();
+    }
+
+    public String getMessageType()
+    {
+        return header_.getMessageType();
+    }
+
+    public String getVerb()
+    {
+        return header_.getVerb();
+    }
+
+    public String getMessageId()
+    {
+        return header_.getMessageId();
+    }
+    
+    public Class[] getTypes()
+    {
+        List<Class> types = new ArrayList<Class>();
+        
+        for ( int i = 0; i < body_.length; ++i )
+        {
+            if ( body_[i].getClass().isArray() )
+            {
+                int size = Array.getLength(body_[i]);
+                if ( size > 0 )
+                {
+                    types.add( Array.get( body_[i], 0).getClass() );
+                }
+            }
+            else
+            {
+                types.add(body_[i].getClass());
+            }
+        }
+        
+        return types.toArray( new Class[0] );
+    }    
+
+    void setMessageId(String id)
+    {
+        header_.setMessageId(id);
+    }    
+
+    public Message getReply(EndPoint from, Object[] args)
+    {        
+        Message response = new Message(getMessageId(),
+                                       from,
+                                       MessagingService.responseStage_,
+                                       MessagingService.responseVerbHandler_,
+                                       args);
+        return response;
+    }
+    
+    public String toString()
+    {
+        StringBuffer sbuf = new StringBuffer("");
+        String separator = System.getProperty("line.separator");
+        sbuf.append("ID:" + getMessageId());
+        sbuf.append(separator);
+        sbuf.append("FROM:" + getFrom());
+        sbuf.append(separator);
+        sbuf.append("TYPE:" + getMessageType());
+        sbuf.append(separator);
+        sbuf.append("VERB:" + getVerb());
+        sbuf.append(separator);
+        sbuf.append("BODY TYPE:" + getBodyTypes());        
+        sbuf.append(separator);
+        return sbuf.toString();
+    }
+    
+    private String getBodyTypes()
+    {
+        StringBuffer sbuf = new StringBuffer("");
+        Class[] types = getTypes();
+        for ( int i = 0; i < types.length; ++i )
+        {
+            sbuf.append(types[i].getName());
+            sbuf.append(" ");         
+        }
+        return sbuf.toString();
+    }    
+}
+
+class MessageSerializer implements ICompactSerializer<Message>
+{
+    public void serialize(Message t, DataOutputStream dos) throws IOException
+    {
+        Header.serializer().serialize( t.header_, dos);
+        byte[] bytes = (byte[])t.getMessageBody()[0];
+        dos.writeInt(bytes.length);
+        dos.write(bytes);
+    }
+
+    public Message deserialize(DataInputStream dis) throws IOException
+    {
+        Header header = Header.serializer().deserialize(dis);
+        int size = dis.readInt();
+        byte[] bytes = new byte[size];
+        dis.readFully(bytes);
+        // return new Message(header.getMessageId(), header.getFrom(), header.getMessageType(), header.getVerb(), new Object[]{bytes});
+        return new Message(header, new Object[]{bytes});
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageDeliveryTask.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageDeliveryTask.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageDeliveryTask.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MessageDeliveryTask implements Runnable
+{
+    private Message message_;
+    private static Logger logger_ = Logger.getLogger(MessageDeliveryTask.class);    
+    
+    public MessageDeliveryTask(Message message)
+    {
+        message_ = message;    
+    }
+    
+    public void run()
+    { 
+        try
+        {            
+            String verb = message_.getVerb();                               
+            IVerbHandler verbHandler = MessagingService.getMessagingInstance().getVerbHandler(verb);           
+            if ( verbHandler != null )
+            {
+                verbHandler.doVerb(message_);        
+            }
+        }
+        catch (Throwable th)
+        {
+            logger_.warn( LogUtil.throwableToString(th) );
+        }
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageDeserializationTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageDeserializationTask.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageDeserializationTask.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageDeserializationTask.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.net.io.FastSerializer;
+import org.apache.cassandra.net.io.ISerializer;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class MessageDeserializationTask implements Runnable
+{
+    private static Logger logger_ = Logger.getLogger(MessageDeserializationTask.class); 
+    private static ISerializer serializer_ = new FastSerializer();
+    private int serializerType_;
+    private byte[] bytes_ = new byte[0];    
+    
+    MessageDeserializationTask(int serializerType, byte[] bytes)
+    {
+        serializerType_ = serializerType;
+        bytes_ = bytes;        
+    }
+    
+    public void run()
+    {
+    	/* For DEBUG only. Printing queue length */   
+    	DebuggableThreadPoolExecutor es = (DebuggableThreadPoolExecutor)MessagingService.getDeserilizationExecutor();
+        logger_.debug( "Message Deserialization Task: " + (es.getTaskCount() - es.getCompletedTaskCount()) );
+        /* END DEBUG */
+        try
+        {                        
+            Message message = (Message)serializer_.deserialize(bytes_);                                                           
+            
+            if ( message != null )
+            {
+                message = SinkManager.processServerMessageSink(message);
+                MessagingService.receive(message);                                                                                                    
+            }
+        }
+        catch ( IOException ex )
+        {            
+            logger_.warn(LogUtil.throwableToString(ex));              
+        }
+    }
+
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageSerializationTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageSerializationTask.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageSerializationTask.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/MessageSerializationTask.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.net.SocketException;
+
+import org.apache.cassandra.concurrent.Context;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadLocalContext;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class MessageSerializationTask implements Runnable
+{
+    private static Logger logger_ = Logger.getLogger(MessageSerializationTask.class);
+    private Message message_;
+    private EndPoint to_;    
+    
+    public MessageSerializationTask(Message message, EndPoint to)
+    {
+        message_ = message;
+        to_ = to;        
+    }
+    
+    public Message getMessage()
+    {
+        return message_;
+    }
+
+    public void run()
+    {        
+    	/* For DEBUG only. Printing queue length */   
+    	DebuggableThreadPoolExecutor es = (DebuggableThreadPoolExecutor)MessagingService.getWriteExecutor();
+        logger_.debug( "Message Serialization Task: " + (es.getTaskCount() - es.getCompletedTaskCount()) );
+        /* END DEBUG */
+        
+        /* Adding the message to be serialized in the TLS. For accessing in the afterExecute() */
+        Context ctx = new Context();
+        ctx.put(this.getClass().getName(), message_);
+        ThreadLocalContext.put(ctx);
+        
+        TcpConnection connection = null;
+        try
+        {
+            Message message = SinkManager.processClientMessageSink(message_);
+            if(null == message) 
+                return;
+            connection = MessagingService.getConnection(message_.getFrom(), to_);
+            connection.write(message);            
+        }            
+        catch ( SocketException se )
+        {            
+            // Shutting down the entire pool. May be too conservative an approach.
+            MessagingService.getConnectionPool(message_.getFrom(), to_).shutdown();
+            logger_.warn(LogUtil.throwableToString(se));
+        }
+        catch ( IOException e )
+        {
+            logConnectAndIOException(e, connection);
+        }
+        catch (Throwable th)
+        {
+            logger_.warn(LogUtil.throwableToString(th));
+        }
+        finally
+        {
+            if ( connection != null )
+            {
+                connection.close();
+            }            
+        }
+    }
+    
+    private void logConnectAndIOException(IOException ex, TcpConnection connection)
+    {                    
+        if ( connection != null )
+        {
+            connection.errorClose();
+        }
+        logger_.warn(LogUtil.throwableToString(ex));
+    }
+}
+

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingConfig.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingConfig.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingConfig.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingConfig.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MessagingConfig
+{
+    // The expected time for one message round trip.  It does not reflect message processing
+    // time at the receiver.
+    private static int expectedRoundTripTime_ = 400;
+    private static int numberOfPorts_ = 2;
+    private static int threadCount_ = 4;
+
+    public static int getMessagingThreadCount()
+    {
+        return threadCount_;
+    }
+
+    public static void setMessagingThreadCount(int threadCount)
+    {
+        threadCount_ = threadCount;
+    }
+
+    public static void setExpectedRoundTripTime(int roundTripTimeMillis) {
+    	if(roundTripTimeMillis > 0 )
+    		expectedRoundTripTime_ = roundTripTimeMillis;
+    }
+
+    public static int getExpectedRoundTripTime()
+    {
+        return expectedRoundTripTime_;
+    }
+
+    public static int getConnectionPoolInitialSize()
+    {
+        return ConnectionPoolConfiguration.initialSize_;
+    }
+
+    public static int getConnectionPoolGrowthFactor()
+    {
+        return ConnectionPoolConfiguration.growthFactor_;
+    }
+
+    public static int getConnectionPoolMaxSize()
+    {
+        return ConnectionPoolConfiguration.maxSize_;
+    }
+
+    public static int getConnectionPoolWaitTimeout()
+    {
+        return ConnectionPoolConfiguration.waitTimeout_;
+    }
+
+    public static int getConnectionPoolMonitorInterval()
+    {
+        return ConnectionPoolConfiguration.monitorInterval_;
+    }
+
+    public static void setNumberOfPorts(int n)
+    {
+        numberOfPorts_ = n;
+    }
+
+    public static int getNumberOfPorts()
+    {
+        return numberOfPorts_;
+    }
+}
+
+class ConnectionPoolConfiguration
+{
+    public static int initialSize_ = 1;
+    public static int growthFactor_ = 1;
+    public static int maxSize_ = 1;
+    public static int waitTimeout_ = 10;
+    public static int monitorInterval_ = 300;
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingService.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,743 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.net.*;
+import java.security.MessageDigest;
+import java.util.*;
+import java.nio.ByteBuffer;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import java.nio.channels.*;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.net.io.*;
+import org.apache.cassandra.utils.*;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.xml.bind.*;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.IStage;
+import org.apache.cassandra.concurrent.MultiThreadedStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.http.HttpConnectionHandler;
+import org.apache.cassandra.net.io.SerializerType;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.Cachetable;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.HashingSchemes;
+import org.apache.cassandra.utils.ICachetable;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MessagingService implements IMessagingService, MessagingServiceMBean
+{
+    private static boolean debugOn_ = false;   
+    
+    private static int version_ = 1;
+    //TODO: make this parameter dynamic somehow.  Not sure if config is appropriate.
+    private static SerializerType serializerType_ = SerializerType.BINARY;
+    
+    private static byte[] protocol_ = new byte[16];
+    /* Verb Handler for the Response */
+    public static final String responseVerbHandler_ = "RESPONSE";
+    /* Stage for responses. */
+    public static final String responseStage_ = "RESPONSE-STAGE";
+    private enum ReservedVerbs_ {RESPONSE};
+    
+    private static Map<String, String> reservedVerbs_ = new Hashtable<String, String>();
+    /* Indicate if we are currently streaming data to another node or receiving streaming data */
+    private static AtomicBoolean isStreaming_ = new AtomicBoolean(false);
+    
+    /* This records all the results mapped by message Id */
+    private static ICachetable<String, IAsyncCallback> callbackMap_;
+    private static ICachetable<String, IAsyncResult> taskCompletionMap_;
+    
+    /* Manages the table of endpoints it is listening on */
+    private static Set<EndPoint> endPoints_;
+    
+    /* List of sockets we are listening on */
+    private static Map<EndPoint, SelectionKey> listenSockets_ = new HashMap<EndPoint, SelectionKey>();
+    
+    /* Lookup table for registering message handlers based on the verb. */
+    private static Map<String, IVerbHandler> verbHandlers_;
+    
+    private static Map<String, MulticastSocket> mCastMembership_ = new HashMap<String, MulticastSocket>();
+    
+    /* Thread pool to handle messaging read activities of Socket and default stage */
+    private static ExecutorService messageDeserializationExecutor_;
+    
+    /* Thread pool to handle messaging write activities */
+    private static ExecutorService messageSerializerExecutor_;
+    
+    /* Thread pool to handle deserialization of messages read from the socket. */
+    private static ExecutorService messageDeserializerExecutor_;
+    
+    /* Thread pool to handle messaging write activities */
+    private static ExecutorService streamExecutor_;
+    
+    private final static ReentrantLock lock_ = new ReentrantLock();
+    private static Map<String, TcpConnectionManager> poolTable_ = new Hashtable<String, TcpConnectionManager>();
+    
+    private static boolean bShutdown_ = false;
+    
+    private static Logger logger_ = Logger.getLogger(MessagingService.class);
+    
+    private static IMessagingService messagingService_ = new MessagingService();
+    
+    public static boolean isDebugOn()
+    {
+        return debugOn_;
+    }
+    
+    public static void debugOn(boolean on)
+    {
+        debugOn_ = on;
+    }
+    
+    public static SerializerType getSerializerType()
+    {
+        return serializerType_;
+    }
+    
+    public synchronized static void serializerType(String type)
+    { 
+        if ( type.equalsIgnoreCase("binary") )
+        {
+            serializerType_ = SerializerType.BINARY;
+        }
+        else if ( type.equalsIgnoreCase("java") )
+        {
+            serializerType_ = SerializerType.JAVA;
+        }
+        else if ( type.equalsIgnoreCase("xml") )
+        {
+            serializerType_ = SerializerType.XML;
+        }
+    }
+    
+    public static int getVersion()
+    {
+        return version_;
+    }
+    
+    public static void setVersion(int version)
+    {
+        version_ = version;
+    }
+    
+    public static IMessagingService getMessagingInstance()
+    {   
+    	if ( bShutdown_ )
+    	{
+            lock_.lock();
+            try
+            {
+                if ( bShutdown_ )
+                {
+            		messagingService_ = new MessagingService();
+            		bShutdown_ = false;
+                }
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+    	}
+        return messagingService_;
+    }
+    
+    public Object clone() throws CloneNotSupportedException
+    {
+        //Prevents the singleton from being cloned
+        throw new CloneNotSupportedException();
+    }
+
+    protected MessagingService()
+    {        
+        for ( ReservedVerbs_ verbs : ReservedVerbs_.values() )
+        {
+            reservedVerbs_.put(verbs.toString(), verbs.toString());
+        }
+        verbHandlers_ = new HashMap<String, IVerbHandler>();        
+        endPoints_ = new HashSet<EndPoint>();
+        /*
+         * Leave callbacks in the cachetable long enough that any related messages will arrive
+         * before the callback is evicted from the table. The concurrency level is set at 128
+         * which is the sum of the threads in the pool that adds shit into the table and the 
+         * pool that retrives the callback from here.
+        */ 
+        int maxSize = MessagingConfig.getMessagingThreadCount();
+        callbackMap_ = new Cachetable<String, IAsyncCallback>( 2 * DatabaseDescriptor.getRpcTimeout() );
+        taskCompletionMap_ = new Cachetable<String, IAsyncResult>( 2 * DatabaseDescriptor.getRpcTimeout() );        
+        
+        messageDeserializationExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
+                maxSize,
+                Integer.MAX_VALUE,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(),
+                new ThreadFactoryImpl("MESSAGING-SERVICE-POOL")
+                );
+                
+        messageSerializerExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
+                maxSize,
+                Integer.MAX_VALUE,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(),
+                new ThreadFactoryImpl("MESSAGE-SERIALIZER-POOL")
+                ); 
+        
+        messageDeserializerExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
+                maxSize,
+                Integer.MAX_VALUE,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(),
+                new ThreadFactoryImpl("MESSAGE-DESERIALIZER-POOL")
+                ); 
+        
+        streamExecutor_ = new DebuggableThreadPoolExecutor( 1,
+                1,
+                Integer.MAX_VALUE,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(),
+                new ThreadFactoryImpl("MESSAGE-STREAMING-POOL")
+                ); 
+                
+        protocol_ = hash(HashingSchemes.MD5, "FB-MESSAGING".getBytes());        
+        /* register the response verb handler */
+        registerVerbHandlers(MessagingService.responseVerbHandler_, new ResponseVerbHandler());
+        /* register stage for response */
+        StageManager.registerStage(MessagingService.responseStage_, new MultiThreadedStage("RESPONSE-STAGE", maxSize) );
+    }
+    
+    public byte[] hash(String type, byte data[])
+    {
+        byte result[] = null;
+        try
+        {
+            MessageDigest messageDigest = MessageDigest.getInstance(type);
+            result = messageDigest.digest(data);
+        }
+        catch(Exception e)
+        {
+            LogUtil.getLogger(MessagingService.class.getName()).debug(LogUtil.throwableToString(e));
+        }
+        return result;
+    }
+    
+    public long getMessagingSerializerTaskCount()
+    {
+        DebuggableThreadPoolExecutor dstp = (DebuggableThreadPoolExecutor)messageSerializerExecutor_;        
+        return dstp.getTaskCount() - dstp.getCompletedTaskCount();
+    }
+    
+    public long getMessagingReceiverTaskCount()
+    {
+        DebuggableThreadPoolExecutor dstp = (DebuggableThreadPoolExecutor)messageDeserializationExecutor_;        
+        return dstp.getTaskCount() - dstp.getCompletedTaskCount(); 
+    }
+    
+    public void listen(EndPoint localEp, boolean isHttp) throws IOException
+    {        
+        ServerSocketChannel serverChannel = ServerSocketChannel.open();
+        ServerSocket ss = serverChannel.socket();            
+        ss.bind(localEp.getInetAddress());
+        serverChannel.configureBlocking(false);
+        
+        SelectionKeyHandler handler = null;
+        if ( isHttp )
+        {                
+            handler = new HttpConnectionHandler();
+        }
+        else
+        {
+            handler = new TcpConnectionHandler(localEp);
+        }
+        
+        SelectionKey key = SelectorManager.getSelectorManager().register(serverChannel, handler, SelectionKey.OP_ACCEPT);          
+        endPoints_.add(localEp);            
+        listenSockets_.put(localEp, key);             
+    }
+    
+    public void listenUDP(EndPoint localEp)
+    {
+        UdpConnection connection = new UdpConnection();
+        logger_.debug("Starting to listen on " + localEp);
+        try
+        {
+            connection.init(localEp.getPort());
+            endPoints_.add(localEp);     
+        }
+        catch ( IOException e )
+        {
+            logger_.warn(LogUtil.throwableToString(e));
+        }
+    }
+    
+    public static TcpConnectionManager getConnectionPool(EndPoint from, EndPoint to)
+    {
+        String key = from + ":" + to;
+        TcpConnectionManager cp = poolTable_.get(key);
+        if( cp == null )
+        {
+            lock_.lock();
+            try
+            {
+                cp = poolTable_.get(key);
+                if (cp == null )
+                {
+                    cp = new TcpConnectionManager(MessagingConfig.getConnectionPoolInitialSize(), 
+                            MessagingConfig.getConnectionPoolGrowthFactor(), 
+                            MessagingConfig.getConnectionPoolMaxSize(), from, to);
+                    poolTable_.put(key, cp);
+                }
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        return cp;
+    }
+
+    public static ConnectionStatistics[] getPoolStatistics()
+    {
+        Set<ConnectionStatistics> stats = new HashSet<ConnectionStatistics>();        
+        Iterator<TcpConnectionManager> it = poolTable_.values().iterator();
+        while ( it.hasNext() )
+        {
+            TcpConnectionManager cp = it.next();
+            ConnectionStatistics cs = new ConnectionStatistics(cp.getLocalEndPoint(), cp.getRemoteEndPoint(), cp.getPoolSize(), cp.getConnectionsInUse());
+            stats.add( cs );
+        }
+        return stats.toArray(new ConnectionStatistics[0]);
+    }
+    
+    public static TcpConnection getConnection(EndPoint from, EndPoint to) throws IOException
+    {
+        return getConnectionPool(from, to).getConnection();
+    }
+    
+    private void checkForReservedVerb(String type)
+    {
+    	if ( reservedVerbs_.get(type) != null && verbHandlers_.get(type) != null )
+    	{
+    		throw new IllegalArgumentException( type + " is a reserved verb handler. Scram!");
+    	}
+    }     
+    
+    public void registerVerbHandlers(String type, IVerbHandler verbHandler)
+    {
+    	checkForReservedVerb(type);
+    	verbHandlers_.put(type, verbHandler);
+    }
+    
+    public void deregisterAllVerbHandlers(EndPoint localEndPoint)
+    {
+        Iterator keys = verbHandlers_.keySet().iterator();
+        String key = null;
+        
+        /*
+         * endpoint specific verbhandlers can be distinguished because
+         * their key's contain the name of the endpoint. 
+         */
+        while(keys.hasNext())
+        {
+            key = (String)keys.next();
+            if(-1 != key.indexOf(localEndPoint.toString()))
+                keys.remove();
+        }
+    }
+    
+    public void deregisterVerbHandlers(String type)
+    {
+        verbHandlers_.remove(type);
+    }
+
+    public IVerbHandler getVerbHandler(String type)
+    {
+        IVerbHandler handler = (IVerbHandler)verbHandlers_.get(type);
+        return handler;
+    }
+
+    public String sendRR(Message message, EndPoint[] to, IAsyncCallback cb)
+    {
+        String messageId = message.getMessageId();                        
+        callbackMap_.put(messageId, cb);
+        for ( int i = 0; i < to.length; ++i )
+        {
+            sendOneWay(message, to[i]);
+        }
+        return messageId;
+    }
+    
+    public String sendRR(Message message, EndPoint to, IAsyncCallback cb)
+    {        
+        String messageId = message.getMessageId();
+        callbackMap_.put(messageId, cb);
+        sendOneWay(message, to);
+        return messageId;
+    }
+
+    public String sendRR(Message[] messages, EndPoint[] to, IAsyncCallback cb)
+    {
+        if ( messages.length != to.length )
+        {
+            throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
+        }
+        String groupId = GuidGenerator.guid();
+        callbackMap_.put(groupId, cb);
+        for ( int i = 0; i < messages.length; ++i )
+        {
+            messages[i].setMessageId(groupId);
+            sendOneWay(messages[i], to[i]);
+        }
+        return groupId;
+    }    
+
+    /*
+        Use this version for fire and forget style messaging.
+    */
+    public void sendOneWay(Message message, EndPoint to)
+    {        
+        // do local deliveries        
+        if ( message.getFrom().equals(to) )
+        {            
+            MessagingService.receive(message);
+            return;
+        }
+        
+        Runnable tcpWriteEvent = new MessageSerializationTask(message, to);
+        messageSerializerExecutor_.execute(tcpWriteEvent);    
+    }
+    
+    public IAsyncResult sendRR(Message message, EndPoint to)
+    {
+        IAsyncResult iar = new AsyncResult();
+        taskCompletionMap_.put(message.getMessageId(), iar);
+        sendOneWay(message, to);
+        return iar;
+    }
+    
+    public void sendUdpOneWay(Message message, EndPoint to)
+    {
+        EndPoint from = message.getFrom();              
+        if (message.getFrom().equals(to)) {
+            MessagingService.receive(message);
+            return;
+        }
+        
+        UdpConnection connection = null;
+        try
+        {
+            connection = new UdpConnection(); 
+            connection.init();            
+            connection.write(message, to);            
+        }            
+        catch ( IOException e )
+        {               
+            logger_.warn(LogUtil.throwableToString(e));
+        } 
+        finally
+        {
+            if ( connection != null )
+                connection.close();
+        }
+    }
+    
+    public void stream(String file, long startPosition, long total, EndPoint from, EndPoint to)
+    {
+        isStreaming_.set(true);
+        /* Streaming asynchronously on streamExector_ threads. */
+        Runnable streamingTask = new FileStreamTask(file, startPosition, total, from, to);
+        streamExecutor_.execute(streamingTask);
+    }
+    
+    /*
+     * Does the application determine if we are currently streaming data.
+     * This would imply either streaming to a receiver, receiving streamed
+     * data or both. 
+    */
+    public static boolean isStreaming()
+    {
+        return isStreaming_.get();
+    }
+    
+    public static void setStreamingMode(boolean bVal)
+    {
+        isStreaming_.set(bVal);
+    }
+    
+    public static void shutdown()
+    {
+        logger_.info("Shutting down ...");
+        synchronized ( MessagingService.class )
+        {          
+            /* Stop listening on any socket */            
+            for( SelectionKey skey : listenSockets_.values() )
+            {
+                SelectorManager.getSelectorManager().cancel(skey);
+            }
+            listenSockets_.clear();
+            
+            /* Shutdown the threads in the EventQueue's */            
+            messageDeserializationExecutor_.shutdownNow();            
+            messageSerializerExecutor_.shutdownNow();
+            messageDeserializerExecutor_.shutdownNow();
+            streamExecutor_.shutdownNow();
+            
+            /* shut down the cachetables */
+            taskCompletionMap_.shutdown();
+            callbackMap_.shutdown();                        
+                        
+            /* Interrupt the selector manager thread */
+            SelectorManager.getSelectorManager().interrupt();
+            
+            poolTable_.clear();            
+            verbHandlers_.clear();                                    
+            bShutdown_ = true;
+        }
+        logger_.debug("Shutdown invocation complete.");
+    }
+
+    public static void receive(Message message)
+    {        
+        enqueueRunnable(message.getMessageType(), new MessageDeliveryTask(message));
+    }
+    
+    public static boolean isLocalEndPoint(EndPoint ep)
+    {
+        return ( endPoints_.contains(ep) );
+    }
+        
+    private static void enqueueRunnable(String stageName, Runnable runnable){
+        
+        IStage stage = StageManager.getStage(stageName);   
+        
+        if ( stage != null )
+        {
+            logger_.info("Running on stage " + stage.getName());
+            stage.execute(runnable);
+        } 
+        else
+        {
+            logger_.info("Running on default stage - beware");
+            messageSerializerExecutor_.execute(runnable);
+        }
+    }    
+    
+    public static IAsyncCallback getRegisteredCallback(String key)
+    {
+        return callbackMap_.get(key);
+    }
+    
+    public static void removeRegisteredCallback(String key)
+    {
+        callbackMap_.remove(key);
+    }
+    
+    public static IAsyncResult getAsyncResult(String key)
+    {
+        return taskCompletionMap_.remove(key);
+    }
+    
+    public static void removeAsyncResult(String key)
+    {
+        taskCompletionMap_.remove(key);
+    }
+
+    public static byte[] getProtocol()
+    {
+        return protocol_;
+    }
+    
+    public static ExecutorService getReadExecutor()
+    {
+        return messageDeserializationExecutor_;
+    }
+    
+    public static ExecutorService getWriteExecutor()
+    {
+        return messageSerializerExecutor_;
+    }
+    
+    public static ExecutorService getDeserilizationExecutor()
+    {
+        return messageDeserializerExecutor_;
+    }
+
+    public static boolean isProtocolValid(byte[] protocol)
+    {
+        return isEqual(protocol_, protocol);
+    }
+    
+    public static boolean isEqual(byte digestA[], byte digestB[])
+    {
+        return MessageDigest.isEqual(digestA, digestB);
+    }
+
+    public static byte[] toByteArray(int i)
+    {
+        byte bytes[] = new byte[4];
+        bytes[0] = (byte)(i >>> 24 & 0xff);
+        bytes[1] = (byte)(i >>> 16 & 0xff);
+        bytes[2] = (byte)(i >>> 8 & 0xff);
+        bytes[3] = (byte)(i & 0xff);
+        return bytes;
+    }
+    
+    public static byte[] toByteArray(short s)
+    {
+        byte bytes[] = new byte[2];
+        bytes[0] = (byte)(s >>> 8 & 0xff);
+        bytes[1] = (byte)(s & 0xff);
+        return bytes;
+    }
+    
+    public static short byteArrayToShort(byte bytes[])
+    {
+        return byteArrayToShort(bytes, 0);
+    }
+    
+    public static short byteArrayToShort(byte bytes[], int offset)
+    {
+        if(bytes.length - offset < 2)
+            throw new IllegalArgumentException("A short must be 2 bytes in size.");
+        short n = 0;
+        for(int i = 0; i < 2; i++)
+        {
+            n <<= 8;
+            n |= bytes[offset + i] & 0xff;
+        }
+
+        return n;
+    }
+
+    public static int getBits(int x, int p, int n)
+    {
+        return x >>> (p + 1) - n & ~(-1 << n);
+    }
+    
+    public static int byteArrayToInt(byte bytes[])
+    {
+        return byteArrayToInt(bytes, 0);
+    }
+
+    public static int byteArrayToInt(byte bytes[], int offset)
+    {
+        if(bytes.length - offset < 4)
+            throw new IllegalArgumentException("An integer must be 4 bytes in size.");
+        int n = 0;
+        for(int i = 0; i < 4; i++)
+        {
+            n <<= 8;
+            n |= bytes[offset + i] & 0xff;
+        }
+
+        return n;
+    }
+    
+    public static ByteBuffer packIt(byte[] bytes, boolean compress, boolean stream, boolean listening)
+    {
+        byte[] size = toByteArray(bytes.length);
+        /* 
+             Setting up the protocol header. This is 4 bytes long
+             represented as an integer. The first 2 bits indicate
+             the serializer type. The 3rd bit indicates if compression
+             is turned on or off. It is turned off by default. The 4th
+             bit indicates if we are in streaming mode. It is turned off
+             by default. The 5th bit is used to indicate that the sender
+             is not listening on any well defined port. This implies the 
+             receiver needs to cache the connection using the port on the 
+             socket. The following 3 bits are reserved for future use. 
+             The next 8 bits indicate a version number. Remaining 15 bits 
+             are not used currently.            
+        */
+        int n = 0;
+        // Setting up the serializer bit
+        n |= serializerType_.ordinal();
+        // set compression bit.
+        if ( compress )
+            n |= 4;
+        
+        // set streaming bit
+        if ( stream )
+            n |= 8;
+        
+        // set listening 5th bit
+        if ( listening )
+            n |= 16;
+        
+        // Setting up the version bit 
+        n |= (version_ << 8);               
+        /* Finished the protocol header setup */
+               
+        byte[] header = toByteArray(n);
+        ByteBuffer buffer = ByteBuffer.allocate(16 + header.length + size.length + bytes.length);
+        buffer.put(protocol_);
+        buffer.put(header);
+        buffer.put(size);
+        buffer.put(bytes);
+        buffer.flip();
+        return buffer;
+    }
+        
+    public static ByteBuffer constructStreamHeader(boolean compress, boolean stream)
+    {
+        /* 
+        Setting up the protocol header. This is 4 bytes long
+        represented as an integer. The first 2 bits indicate
+        the serializer type. The 3rd bit indicates if compression
+        is turned on or off. It is turned off by default. The 4th
+        bit indicates if we are in streaming mode. It is turned off
+        by default. The following 4 bits are reserved for future use. 
+        The next 8 bits indicate a version number. Remaining 15 bits 
+        are not used currently.            
+        */
+        int n = 0;
+        // Setting up the serializer bit
+        n |= serializerType_.ordinal();
+        // set compression bit.
+        if ( compress )
+            n |= 4;
+       
+        // set streaming bit
+        if ( stream )
+            n |= 8;
+       
+        // Setting up the version bit 
+        n |= (version_ << 8);              
+        /* Finished the protocol header setup */
+              
+        byte[] header = toByteArray(n);
+        ByteBuffer buffer = ByteBuffer.allocate(16 + header.length);
+        buffer.put(protocol_);
+        buffer.put(header);
+        buffer.flip();
+        return buffer;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingServiceMBean.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingServiceMBean.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/MessagingServiceMBean.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface MessagingServiceMBean
+{   
+    public long getMessagingSerializerTaskCount();
+    public long getMessagingReceiverTaskCount();
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/ProtocolHeader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/ProtocolHeader.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/ProtocolHeader.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/ProtocolHeader.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class ProtocolHeader
+{
+    public static final String SERIALIZER = "SERIALIZER";
+    public static final String COMPRESSION = "COMPRESSION";
+    public static final String VERSION = "VERSION";
+    
+    public int serializerType_;
+    public boolean isCompressed_;
+    public boolean isStreamingMode_;
+    public boolean isListening_;
+    public int version_;
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/ResponseVerbHandler.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ResponseVerbHandler implements IVerbHandler
+{
+    private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class );
+    
+    public void doVerb(Message message)
+    {     
+        String messageId = message.getMessageId();        
+        IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
+        if ( cb != null )
+        {
+            logger_.info("Processing response on a callback from " + message.getFrom());
+            cb.response(message);
+        }
+        else
+        {            
+            AsyncResult ar = (AsyncResult)MessagingService.getAsyncResult(messageId);
+            if ( ar != null )
+            {
+                logger_.info("Processing response on an async result from " + message.getFrom());
+                ar.result(message.getMessageBody());
+            }
+        }
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectionKeyHandler.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.nio.channels.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SelectionKeyHandler 
+{
+    public void modifyKey(SelectionKey key)
+    {
+        throw new UnsupportedOperationException("modifyKey() cannot be called on " + getClass().getName() + "!");
+    }
+    
+    public void modifyKeyForRead(SelectionKey key)
+    {
+        throw new UnsupportedOperationException("modifyKeyForRead() cannot be called on " + getClass().getName() + "!");
+    }
+    
+    public void modifyKeyForWrite(SelectionKey key)
+    {
+        throw new UnsupportedOperationException("modifyKeyForWrite() cannot be called on " + getClass().getName() + "!");
+    }
+    
+    /**
+     * Method which is called when the key becomes acceptable.
+     *
+     * @param key The key which is acceptable.
+     */
+    public void accept(SelectionKey key)
+    {
+         throw new UnsupportedOperationException("accept() cannot be called on " + getClass().getName() + "!");
+    }
+    
+    /**
+     * Method which is called when the key becomes connectable.
+     *
+     * @param key The key which is connectable.
+     */
+    public void connect(SelectionKey key)
+    {
+        throw new UnsupportedOperationException("connect() cannot be called on " + getClass().getName() + "!");
+    }
+    
+    /**
+     * Method which is called when the key becomes readable.
+     *
+     * @param key The key which is readable.
+     */
+    public void read(SelectionKey key)
+    {
+        throw new UnsupportedOperationException("read() cannot be called on " + getClass().getName() + "!");
+    }
+    
+    /**
+     * Method which is called when the key becomes writable.
+     *
+     * @param key The key which is writable.
+     */
+    public void write(SelectionKey key)
+    {
+        throw new UnsupportedOperationException("write() cannot be called on " + getClass().getName() + "!");
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/SelectorManager.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.TreeSet;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SelectorManager extends Thread
+{
+    private static final Logger logger_ = Logger.getLogger(SelectorManager.class); 
+    // the underlying selector used
+    /**
+     * DESCRIBE THE FIELD
+     */
+    protected Selector selector_;   
+    protected HashSet<SelectionKey> modifyKeysForRead_;    
+    protected HashSet<SelectionKey> modifyKeysForWrite_;
+    
+    // the list of keys waiting to be cancelled
+    protected HashSet<SelectionKey> cancelledKeys_;    
+
+    // The static selector manager which is used by all applications
+    private static SelectorManager manager_;
+    
+    // The static UDP selector manager which is used by all applications
+    private static SelectorManager udpManager_;
+
+    /**
+     * Constructor, which is private since there is only one selector per JVM.
+     * 
+     * @param profile
+     *            DESCRIBE THE PARAMETER
+     */
+    protected SelectorManager(String name)
+    {
+        super(name);                        
+        this.modifyKeysForRead_ = new HashSet<SelectionKey>();
+        this.modifyKeysForWrite_ = new HashSet<SelectionKey>();
+        this.cancelledKeys_ = new HashSet<SelectionKey>();
+
+        // attempt to create selector
+        try
+        {
+            selector_ = Selector.open();
+        }
+        catch (IOException e)
+        {
+            logger_.error("SEVERE ERROR (SelectorManager): Error creating selector "
+                            + e);
+        }
+
+        setDaemon(false);
+        start();
+    }
+
+    /**
+     * Method which asks the Selector Manager to add the given key to the
+     * cancelled set. If noone calls register on this key during the rest of
+     * this select() operation, the key will be cancelled. Otherwise, it will be
+     * returned as a result of the register operation.
+     * 
+     * @param key
+     *            The key to cancel
+     */
+    public void cancel(SelectionKey key)
+    {
+        if (key == null)
+        {
+            throw new NullPointerException();
+        }
+
+        synchronized ( cancelledKeys_ )
+        {
+            cancelledKeys_.add(key);
+        }
+    }
+
+    /**
+     * Registers a new channel with the selector, and attaches the given
+     * SelectionKeyHandler as the handler for the newly created key. Operations
+     * which the hanlder is interested in will be called as available.
+     * 
+     * @param channel
+     *            The channel to regster with the selector
+     * @param handler
+     *            The handler to use for the callbacks
+     * @param ops
+     *            The initial interest operations
+     * @return The SelectionKey which uniquely identifies this channel
+     * @exception IOException
+     *                DESCRIBE THE EXCEPTION
+     */
+    public SelectionKey register(SelectableChannel channel,
+            SelectionKeyHandler handler, int ops) throws IOException
+    {
+        if ((channel == null) || (handler == null))
+        {
+            throw new NullPointerException();
+        }
+
+        selector_.wakeup();
+        SelectionKey key = channel.register(selector_, ops, handler);
+        synchronized(cancelledKeys_)
+        {
+            cancelledKeys_.remove(key);
+        }
+        selector_.wakeup();
+        return key;
+    }      
+    
+    public void modifyKeyForRead(SelectionKey key)
+    {
+        if (key == null)
+        {
+            throw new NullPointerException();
+        }
+
+        synchronized(modifyKeysForRead_)
+        {
+            modifyKeysForRead_.add(key);
+        }
+        selector_.wakeup();
+    }
+    
+    public void modifyKeyForWrite(SelectionKey key)
+    {
+        if (key == null)
+        {
+            throw new NullPointerException();
+        }
+
+        synchronized( modifyKeysForWrite_ )
+        {
+            modifyKeysForWrite_.add(key);
+        }
+        selector_.wakeup();
+    }
+
+    /**
+     * This method starts the socket manager listening for events. It is
+     * designed to be started when this thread's start() method is invoked.
+     */
+    public void run()
+    {
+        try
+        {              
+            // loop while waiting for activity
+            while (true && !Thread.currentThread().interrupted() )
+            { 
+                try
+                {
+                    doProcess();                                                                          
+                    selector_.select(1000); 
+                    synchronized( cancelledKeys_ )
+                    {
+                        if (cancelledKeys_.size() > 0)
+                        {
+                            SelectionKey[] keys = cancelledKeys_.toArray( new SelectionKey[0]);                        
+                            
+                            for ( SelectionKey key : keys )
+                            {
+                                key.cancel();
+                                key.channel().close();
+                            }                                                
+                            cancelledKeys_.clear();
+                        }
+                    }
+                }
+                catch ( IOException e )
+                {
+                    logger_.warn(LogUtil.throwableToString(e));
+                }
+            }
+                         
+            manager_ = null;
+        }
+        catch (Throwable t)
+        {
+            logger_.error("ERROR (SelectorManager.run): " + t);
+            logger_.error(LogUtil.throwableToString(t));
+            System.exit(-1);
+        }
+    }    
+
+    protected void doProcess() throws IOException
+    {
+        doInvocationsForRead();
+        doInvocationsForWrite();
+        doSelections();
+    }
+    
+    /**
+     * DESCRIBE THE METHOD
+     * 
+     * @exception IOException
+     *                DESCRIBE THE EXCEPTION
+     */
+    protected void doSelections() throws IOException
+    {
+        SelectionKey[] keys = selectedKeys();
+
+        for (int i = 0; i < keys.length; i++)
+        {
+            selector_.selectedKeys().remove(keys[i]);
+
+            synchronized (keys[i])
+            {
+                SelectionKeyHandler skh = (SelectionKeyHandler) keys[i]
+                        .attachment();
+
+                if (skh != null)
+                {
+                    // accept
+                    if (keys[i].isValid() && keys[i].isAcceptable())
+                    {
+                        skh.accept(keys[i]);
+                    }
+
+                    // connect
+                    if (keys[i].isValid() && keys[i].isConnectable())
+                    {
+                        skh.connect(keys[i]);
+                    }
+
+                    // read
+                    if (keys[i].isValid() && keys[i].isReadable())
+                    {
+                        skh.read(keys[i]);
+                    }
+
+                    // write
+                    if (keys[i].isValid() && keys[i].isWritable())
+                    {
+                        skh.write(keys[i]);
+                    }
+                }
+                else
+                {
+                    keys[i].channel().close();
+                    keys[i].cancel();
+                }
+            }
+        }
+    }
+    
+    private void doInvocationsForRead()
+    {
+        Iterator<SelectionKey> it;
+        synchronized (modifyKeysForRead_)
+        {
+            it = new ArrayList<SelectionKey>(modifyKeysForRead_).iterator();
+            modifyKeysForRead_.clear();
+        }
+
+        while (it.hasNext())
+        {
+            SelectionKey key = it.next();
+            if (key.isValid() && (key.attachment() != null))
+            {
+                ((SelectionKeyHandler) key.attachment()).modifyKeyForRead(key);
+            }
+        }
+    }
+    
+    private void doInvocationsForWrite()
+    {
+        Iterator<SelectionKey> it;
+        synchronized (modifyKeysForWrite_)
+        {
+            it = new ArrayList<SelectionKey>(modifyKeysForWrite_).iterator();
+            modifyKeysForWrite_.clear();
+        }
+
+        while (it.hasNext())
+        {
+            SelectionKey key = it.next();
+            if (key.isValid() && (key.attachment() != null))
+            {
+                ((SelectionKeyHandler) key.attachment()).modifyKeyForWrite(key);
+            }
+        }
+    }
+
+    /**
+     * Selects all of the currenlty selected keys on the selector and returns
+     * the result as an array of keys.
+     * 
+     * @return The array of keys
+     * @exception IOException
+     *                DESCRIBE THE EXCEPTION
+     */
+    protected SelectionKey[] selectedKeys() throws IOException
+    {
+        return (SelectionKey[]) selector_.selectedKeys().toArray(
+                new SelectionKey[0]);
+    }
+    
+    /**
+     * Returns the SelectorManager applications should use.
+     * 
+     * @return The SelectorManager which applications should use
+     */
+    public static SelectorManager getSelectorManager()
+    {
+        synchronized (SelectorManager.class)
+        {
+            if (manager_ == null)
+            {
+                manager_ = new SelectorManager("TCP Selector Manager");
+            }            
+        }
+        return manager_;
+    }
+    
+    public static SelectorManager getUdpSelectorManager()
+    {
+        synchronized (SelectorManager.class)
+        {
+            if (udpManager_ == null)
+            {
+                udpManager_ = new SelectorManager("UDP Selector Manager");
+            }            
+        }
+        return udpManager_;
+    }
+    
+    /**
+     * Returns whether or not this thread of execution is the selector thread
+     * 
+     * @return Whether or not this is the selector thread
+     */
+    public static boolean isSelectorThread()
+    {
+        return (Thread.currentThread() == manager_);
+    }
+}