You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/02 07:13:17 UTC

svn commit: r749207 [3/12] - in /incubator/cassandra/src/org/apache/cassandra: loader/ locator/ net/ net/http/ net/io/ net/sink/ procedures/ service/ test/ tools/

Added: incubator/cassandra/src/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/MessagingService.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/MessagingService.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/MessagingService.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,743 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.net.*;
+import java.security.MessageDigest;
+import java.util.*;
+import java.nio.ByteBuffer;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import java.nio.channels.*;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.net.io.*;
+import org.apache.cassandra.utils.*;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.xml.bind.*;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.IStage;
+import org.apache.cassandra.concurrent.MultiThreadedStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.http.HttpConnectionHandler;
+import org.apache.cassandra.net.io.SerializerType;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.Cachetable;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.HashingSchemes;
+import org.apache.cassandra.utils.ICachetable;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class MessagingService implements IMessagingService, MessagingServiceMBean
+{
+    private static boolean debugOn_ = false;   
+    
+    private static int version_ = 1;
+    //TODO: make this parameter dynamic somehow.  Not sure if config is appropriate.
+    private static SerializerType serializerType_ = SerializerType.BINARY;
+    
+    private static byte[] protocol_ = new byte[16];
+    /* Verb Handler for the Response */
+    public static final String responseVerbHandler_ = "RESPONSE";
+    /* Stage for responses. */
+    public static final String responseStage_ = "RESPONSE-STAGE";
+    private enum ReservedVerbs_ {RESPONSE};
+    
+    private static Map<String, String> reservedVerbs_ = new Hashtable<String, String>();
+    /* Indicate if we are currently streaming data to another node or receiving streaming data */
+    private static AtomicBoolean isStreaming_ = new AtomicBoolean(false);
+    
+    /* This records all the results mapped by message Id */
+    private static ICachetable<String, IAsyncCallback> callbackMap_;
+    private static ICachetable<String, IAsyncResult> taskCompletionMap_;
+    
+    /* Manages the table of endpoints it is listening on */
+    private static Set<EndPoint> endPoints_;
+    
+    /* List of sockets we are listening on */
+    private static Map<EndPoint, SelectionKey> listenSockets_ = new HashMap<EndPoint, SelectionKey>();
+    
+    /* Lookup table for registering message handlers based on the verb. */
+    private static Map<String, IVerbHandler> verbHandlers_;
+    
+    private static Map<String, MulticastSocket> mCastMembership_ = new HashMap<String, MulticastSocket>();
+    
+    /* Thread pool to handle messaging read activities of Socket and default stage */
+    private static ExecutorService messageDeserializationExecutor_;
+    
+    /* Thread pool to handle messaging write activities */
+    private static ExecutorService messageSerializerExecutor_;
+    
+    /* Thread pool to handle deserialization of messages read from the socket. */
+    private static ExecutorService messageDeserializerExecutor_;
+    
+    /* Thread pool to handle messaging write activities */
+    private static ExecutorService streamExecutor_;
+    
+    private final static ReentrantLock lock_ = new ReentrantLock();
+    private static Map<String, TcpConnectionManager> poolTable_ = new Hashtable<String, TcpConnectionManager>();
+    
+    private static boolean bShutdown_ = false;
+    
+    private static Logger logger_ = Logger.getLogger(MessagingService.class);
+    
+    private static IMessagingService messagingService_ = new MessagingService();
+    
+    public static boolean isDebugOn()
+    {
+        return debugOn_;
+    }
+    
+    public static void debugOn(boolean on)
+    {
+        debugOn_ = on;
+    }
+    
+    public static SerializerType getSerializerType()
+    {
+        return serializerType_;
+    }
+    
+    public synchronized static void serializerType(String type)
+    { 
+        if ( type.equalsIgnoreCase("binary") )
+        {
+            serializerType_ = SerializerType.BINARY;
+        }
+        else if ( type.equalsIgnoreCase("java") )
+        {
+            serializerType_ = SerializerType.JAVA;
+        }
+        else if ( type.equalsIgnoreCase("xml") )
+        {
+            serializerType_ = SerializerType.XML;
+        }
+    }
+    
+    public static int getVersion()
+    {
+        return version_;
+    }
+    
+    public static void setVersion(int version)
+    {
+        version_ = version;
+    }
+    
+    public static IMessagingService getMessagingInstance()
+    {   
+    	if ( bShutdown_ )
+    	{
+            lock_.lock();
+            try
+            {
+                if ( bShutdown_ )
+                {
+            		messagingService_ = new MessagingService();
+            		bShutdown_ = false;
+                }
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+    	}
+        return messagingService_;
+    }
+    
+    public Object clone() throws CloneNotSupportedException
+    {
+        //Prevents the singleton from being cloned
+        throw new CloneNotSupportedException();
+    }
+
+    protected MessagingService()
+    {        
+        for ( ReservedVerbs_ verbs : ReservedVerbs_.values() )
+        {
+            reservedVerbs_.put(verbs.toString(), verbs.toString());
+        }
+        verbHandlers_ = new HashMap<String, IVerbHandler>();        
+        endPoints_ = new HashSet<EndPoint>();
+        /*
+         * Leave callbacks in the cachetable long enough that any related messages will arrive
+         * before the callback is evicted from the table. The concurrency level is set at 128
+         * which is the sum of the threads in the pool that adds shit into the table and the 
+         * pool that retrives the callback from here.
+        */ 
+        int maxSize = MessagingConfig.getMessagingThreadCount();
+        callbackMap_ = new Cachetable<String, IAsyncCallback>( 2 * DatabaseDescriptor.getRpcTimeout() );
+        taskCompletionMap_ = new Cachetable<String, IAsyncResult>( 2 * DatabaseDescriptor.getRpcTimeout() );        
+        
+        messageDeserializationExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
+                maxSize,
+                Integer.MAX_VALUE,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(),
+                new ThreadFactoryImpl("MESSAGING-SERVICE-POOL")
+                );
+                
+        messageSerializerExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
+                maxSize,
+                Integer.MAX_VALUE,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(),
+                new ThreadFactoryImpl("MESSAGE-SERIALIZER-POOL")
+                ); 
+        
+        messageDeserializerExecutor_ = new DebuggableThreadPoolExecutor( maxSize,
+                maxSize,
+                Integer.MAX_VALUE,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(),
+                new ThreadFactoryImpl("MESSAGE-DESERIALIZER-POOL")
+                ); 
+        
+        streamExecutor_ = new DebuggableThreadPoolExecutor( 1,
+                1,
+                Integer.MAX_VALUE,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(),
+                new ThreadFactoryImpl("MESSAGE-STREAMING-POOL")
+                ); 
+                
+        protocol_ = hash(HashingSchemes.MD5, "FB-MESSAGING".getBytes());        
+        /* register the response verb handler */
+        registerVerbHandlers(MessagingService.responseVerbHandler_, new ResponseVerbHandler());
+        /* register stage for response */
+        StageManager.registerStage(MessagingService.responseStage_, new MultiThreadedStage("RESPONSE-STAGE", maxSize) );
+    }
+    
+    public byte[] hash(String type, byte data[])
+    {
+        byte result[] = null;
+        try
+        {
+            MessageDigest messageDigest = MessageDigest.getInstance(type);
+            result = messageDigest.digest(data);
+        }
+        catch(Exception e)
+        {
+            LogUtil.getLogger(MessagingService.class.getName()).debug(LogUtil.throwableToString(e));
+        }
+        return result;
+    }
+    
+    public long getMessagingSerializerTaskCount()
+    {
+        DebuggableThreadPoolExecutor dstp = (DebuggableThreadPoolExecutor)messageSerializerExecutor_;        
+        return dstp.getTaskCount() - dstp.getCompletedTaskCount();
+    }
+    
+    public long getMessagingReceiverTaskCount()
+    {
+        DebuggableThreadPoolExecutor dstp = (DebuggableThreadPoolExecutor)messageDeserializationExecutor_;        
+        return dstp.getTaskCount() - dstp.getCompletedTaskCount(); 
+    }
+    
+    public void listen(EndPoint localEp, boolean isHttp) throws IOException
+    {        
+        ServerSocketChannel serverChannel = ServerSocketChannel.open();
+        ServerSocket ss = serverChannel.socket();            
+        ss.bind(localEp.getInetAddress());
+        serverChannel.configureBlocking(false);
+        
+        SelectionKeyHandler handler = null;
+        if ( isHttp )
+        {                
+            handler = new HttpConnectionHandler();
+        }
+        else
+        {
+            handler = new TcpConnectionHandler(localEp);
+        }
+        
+        SelectionKey key = SelectorManager.getSelectorManager().register(serverChannel, handler, SelectionKey.OP_ACCEPT);          
+        endPoints_.add(localEp);            
+        listenSockets_.put(localEp, key);             
+    }
+    
+    public void listenUDP(EndPoint localEp)
+    {
+        UdpConnection connection = new UdpConnection();
+        logger_.debug("Starting to listen on " + localEp);
+        try
+        {
+            connection.init(localEp.getPort());
+            endPoints_.add(localEp);     
+        }
+        catch ( IOException e )
+        {
+            logger_.warn(LogUtil.throwableToString(e));
+        }
+    }
+    
+    public static TcpConnectionManager getConnectionPool(EndPoint from, EndPoint to)
+    {
+        String key = from + ":" + to;
+        TcpConnectionManager cp = poolTable_.get(key);
+        if( cp == null )
+        {
+            lock_.lock();
+            try
+            {
+                cp = poolTable_.get(key);
+                if (cp == null )
+                {
+                    cp = new TcpConnectionManager(MessagingConfig.getConnectionPoolInitialSize(), 
+                            MessagingConfig.getConnectionPoolGrowthFactor(), 
+                            MessagingConfig.getConnectionPoolMaxSize(), from, to);
+                    poolTable_.put(key, cp);
+                }
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        return cp;
+    }
+
+    public static ConnectionStatistics[] getPoolStatistics()
+    {
+        Set<ConnectionStatistics> stats = new HashSet<ConnectionStatistics>();        
+        Iterator<TcpConnectionManager> it = poolTable_.values().iterator();
+        while ( it.hasNext() )
+        {
+            TcpConnectionManager cp = it.next();
+            ConnectionStatistics cs = new ConnectionStatistics(cp.getLocalEndPoint(), cp.getRemoteEndPoint(), cp.getPoolSize(), cp.getConnectionsInUse());
+            stats.add( cs );
+        }
+        return stats.toArray(new ConnectionStatistics[0]);
+    }
+    
+    public static TcpConnection getConnection(EndPoint from, EndPoint to) throws IOException
+    {
+        return getConnectionPool(from, to).getConnection();
+    }
+    
+    private void checkForReservedVerb(String type)
+    {
+    	if ( reservedVerbs_.get(type) != null && verbHandlers_.get(type) != null )
+    	{
+    		throw new IllegalArgumentException( type + " is a reserved verb handler. Scram!");
+    	}
+    }     
+    
+    public void registerVerbHandlers(String type, IVerbHandler verbHandler)
+    {
+    	checkForReservedVerb(type);
+    	verbHandlers_.put(type, verbHandler);
+    }
+    
+    public void deregisterAllVerbHandlers(EndPoint localEndPoint)
+    {
+        Iterator keys = verbHandlers_.keySet().iterator();
+        String key = null;
+        
+        /*
+         * endpoint specific verbhandlers can be distinguished because
+         * their key's contain the name of the endpoint. 
+         */
+        while(keys.hasNext())
+        {
+            key = (String)keys.next();
+            if(-1 != key.indexOf(localEndPoint.toString()))
+                keys.remove();
+        }
+    }
+    
+    public void deregisterVerbHandlers(String type)
+    {
+        verbHandlers_.remove(type);
+    }
+
+    public IVerbHandler getVerbHandler(String type)
+    {
+        IVerbHandler handler = (IVerbHandler)verbHandlers_.get(type);
+        return handler;
+    }
+
+    public String sendRR(Message message, EndPoint[] to, IAsyncCallback cb)
+    {
+        String messageId = message.getMessageId();                        
+        callbackMap_.put(messageId, cb);
+        for ( int i = 0; i < to.length; ++i )
+        {
+            sendOneWay(message, to[i]);
+        }
+        return messageId;
+    }
+    
+    public String sendRR(Message message, EndPoint to, IAsyncCallback cb)
+    {        
+        String messageId = message.getMessageId();
+        callbackMap_.put(messageId, cb);
+        sendOneWay(message, to);
+        return messageId;
+    }
+
+    public String sendRR(Message[] messages, EndPoint[] to, IAsyncCallback cb)
+    {
+        if ( messages.length != to.length )
+        {
+            throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
+        }
+        String groupId = GuidGenerator.guid();
+        callbackMap_.put(groupId, cb);
+        for ( int i = 0; i < messages.length; ++i )
+        {
+            messages[i].setMessageId(groupId);
+            sendOneWay(messages[i], to[i]);
+        }
+        return groupId;
+    }    
+
+    /*
+        Use this version for fire and forget style messaging.
+    */
+    public void sendOneWay(Message message, EndPoint to)
+    {        
+        // do local deliveries        
+        if ( message.getFrom().equals(to) )
+        {            
+            MessagingService.receive(message);
+            return;
+        }
+        
+        Runnable tcpWriteEvent = new MessageSerializationTask(message, to);
+        messageSerializerExecutor_.execute(tcpWriteEvent);    
+    }
+    
+    public IAsyncResult sendRR(Message message, EndPoint to)
+    {
+        IAsyncResult iar = new AsyncResult();
+        taskCompletionMap_.put(message.getMessageId(), iar);
+        sendOneWay(message, to);
+        return iar;
+    }
+    
+    public void sendUdpOneWay(Message message, EndPoint to)
+    {
+        EndPoint from = message.getFrom();              
+        if (message.getFrom().equals(to)) {
+            MessagingService.receive(message);
+            return;
+        }
+        
+        UdpConnection connection = null;
+        try
+        {
+            connection = new UdpConnection(); 
+            connection.init();            
+            connection.write(message, to);            
+        }            
+        catch ( IOException e )
+        {               
+            logger_.warn(LogUtil.throwableToString(e));
+        } 
+        finally
+        {
+            if ( connection != null )
+                connection.close();
+        }
+    }
+    
+    public void stream(String file, long startPosition, long total, EndPoint from, EndPoint to)
+    {
+        isStreaming_.set(true);
+        /* Streaming asynchronously on streamExector_ threads. */
+        Runnable streamingTask = new FileStreamTask(file, startPosition, total, from, to);
+        streamExecutor_.execute(streamingTask);
+    }
+    
+    /*
+     * Does the application determine if we are currently streaming data.
+     * This would imply either streaming to a receiver, receiving streamed
+     * data or both. 
+    */
+    public static boolean isStreaming()
+    {
+        return isStreaming_.get();
+    }
+    
+    public static void setStreamingMode(boolean bVal)
+    {
+        isStreaming_.set(bVal);
+    }
+    
+    public static void shutdown()
+    {
+        logger_.info("Shutting down ...");
+        synchronized ( MessagingService.class )
+        {          
+            /* Stop listening on any socket */            
+            for( SelectionKey skey : listenSockets_.values() )
+            {
+                SelectorManager.getSelectorManager().cancel(skey);
+            }
+            listenSockets_.clear();
+            
+            /* Shutdown the threads in the EventQueue's */            
+            messageDeserializationExecutor_.shutdownNow();            
+            messageSerializerExecutor_.shutdownNow();
+            messageDeserializerExecutor_.shutdownNow();
+            streamExecutor_.shutdownNow();
+            
+            /* shut down the cachetables */
+            taskCompletionMap_.shutdown();
+            callbackMap_.shutdown();                        
+                        
+            /* Interrupt the selector manager thread */
+            SelectorManager.getSelectorManager().interrupt();
+            
+            poolTable_.clear();            
+            verbHandlers_.clear();                                    
+            bShutdown_ = true;
+        }
+        logger_.debug("Shutdown invocation complete.");
+    }
+
+    public static void receive(Message message)
+    {        
+        enqueueRunnable(message.getMessageType(), new MessageDeliveryTask(message));
+    }
+    
+    public static boolean isLocalEndPoint(EndPoint ep)
+    {
+        return ( endPoints_.contains(ep) );
+    }
+        
+    private static void enqueueRunnable(String stageName, Runnable runnable){
+        
+        IStage stage = StageManager.getStage(stageName);   
+        
+        if ( stage != null )
+        {
+            logger_.info("Running on stage " + stage.getName());
+            stage.execute(runnable);
+        } 
+        else
+        {
+            logger_.info("Running on default stage - beware");
+            messageSerializerExecutor_.execute(runnable);
+        }
+    }    
+    
+    public static IAsyncCallback getRegisteredCallback(String key)
+    {
+        return callbackMap_.get(key);
+    }
+    
+    public static void removeRegisteredCallback(String key)
+    {
+        callbackMap_.remove(key);
+    }
+    
+    public static IAsyncResult getAsyncResult(String key)
+    {
+        return taskCompletionMap_.remove(key);
+    }
+    
+    public static void removeAsyncResult(String key)
+    {
+        taskCompletionMap_.remove(key);
+    }
+
+    public static byte[] getProtocol()
+    {
+        return protocol_;
+    }
+    
+    public static ExecutorService getReadExecutor()
+    {
+        return messageDeserializationExecutor_;
+    }
+    
+    public static ExecutorService getWriteExecutor()
+    {
+        return messageSerializerExecutor_;
+    }
+    
+    public static ExecutorService getDeserilizationExecutor()
+    {
+        return messageDeserializerExecutor_;
+    }
+
+    public static boolean isProtocolValid(byte[] protocol)
+    {
+        return isEqual(protocol_, protocol);
+    }
+    
+    public static boolean isEqual(byte digestA[], byte digestB[])
+    {
+        return MessageDigest.isEqual(digestA, digestB);
+    }
+
+    public static byte[] toByteArray(int i)
+    {
+        byte bytes[] = new byte[4];
+        bytes[0] = (byte)(i >>> 24 & 0xff);
+        bytes[1] = (byte)(i >>> 16 & 0xff);
+        bytes[2] = (byte)(i >>> 8 & 0xff);
+        bytes[3] = (byte)(i & 0xff);
+        return bytes;
+    }
+    
+    public static byte[] toByteArray(short s)
+    {
+        byte bytes[] = new byte[2];
+        bytes[0] = (byte)(s >>> 8 & 0xff);
+        bytes[1] = (byte)(s & 0xff);
+        return bytes;
+    }
+    
+    public static short byteArrayToShort(byte bytes[])
+    {
+        return byteArrayToShort(bytes, 0);
+    }
+    
+    public static short byteArrayToShort(byte bytes[], int offset)
+    {
+        if(bytes.length - offset < 2)
+            throw new IllegalArgumentException("A short must be 2 bytes in size.");
+        short n = 0;
+        for(int i = 0; i < 2; i++)
+        {
+            n <<= 8;
+            n |= bytes[offset + i] & 0xff;
+        }
+
+        return n;
+    }
+
+    public static int getBits(int x, int p, int n)
+    {
+        return x >>> (p + 1) - n & ~(-1 << n);
+    }
+    
+    public static int byteArrayToInt(byte bytes[])
+    {
+        return byteArrayToInt(bytes, 0);
+    }
+
+    public static int byteArrayToInt(byte bytes[], int offset)
+    {
+        if(bytes.length - offset < 4)
+            throw new IllegalArgumentException("An integer must be 4 bytes in size.");
+        int n = 0;
+        for(int i = 0; i < 4; i++)
+        {
+            n <<= 8;
+            n |= bytes[offset + i] & 0xff;
+        }
+
+        return n;
+    }
+    
+    public static ByteBuffer packIt(byte[] bytes, boolean compress, boolean stream, boolean listening)
+    {
+        byte[] size = toByteArray(bytes.length);
+        /* 
+             Setting up the protocol header. This is 4 bytes long
+             represented as an integer. The first 2 bits indicate
+             the serializer type. The 3rd bit indicates if compression
+             is turned on or off. It is turned off by default. The 4th
+             bit indicates if we are in streaming mode. It is turned off
+             by default. The 5th bit is used to indicate that the sender
+             is not listening on any well defined port. This implies the 
+             receiver needs to cache the connection using the port on the 
+             socket. The following 3 bits are reserved for future use. 
+             The next 8 bits indicate a version number. Remaining 15 bits 
+             are not used currently.            
+        */
+        int n = 0;
+        // Setting up the serializer bit
+        n |= serializerType_.ordinal();
+        // set compression bit.
+        if ( compress )
+            n |= 4;
+        
+        // set streaming bit
+        if ( stream )
+            n |= 8;
+        
+        // set listening 5th bit
+        if ( listening )
+            n |= 16;
+        
+        // Setting up the version bit 
+        n |= (version_ << 8);               
+        /* Finished the protocol header setup */
+               
+        byte[] header = toByteArray(n);
+        ByteBuffer buffer = ByteBuffer.allocate(16 + header.length + size.length + bytes.length);
+        buffer.put(protocol_);
+        buffer.put(header);
+        buffer.put(size);
+        buffer.put(bytes);
+        buffer.flip();
+        return buffer;
+    }
+        
+    public static ByteBuffer constructStreamHeader(boolean compress, boolean stream)
+    {
+        /* 
+        Setting up the protocol header. This is 4 bytes long
+        represented as an integer. The first 2 bits indicate
+        the serializer type. The 3rd bit indicates if compression
+        is turned on or off. It is turned off by default. The 4th
+        bit indicates if we are in streaming mode. It is turned off
+        by default. The following 4 bits are reserved for future use. 
+        The next 8 bits indicate a version number. Remaining 15 bits 
+        are not used currently.            
+        */
+        int n = 0;
+        // Setting up the serializer bit
+        n |= serializerType_.ordinal();
+        // set compression bit.
+        if ( compress )
+            n |= 4;
+       
+        // set streaming bit
+        if ( stream )
+            n |= 8;
+       
+        // Setting up the version bit 
+        n |= (version_ << 8);              
+        /* Finished the protocol header setup */
+              
+        byte[] header = toByteArray(n);
+        ByteBuffer buffer = ByteBuffer.allocate(16 + header.length);
+        buffer.put(protocol_);
+        buffer.put(header);
+        buffer.flip();
+        return buffer;
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/MessagingServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/MessagingServiceMBean.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/MessagingServiceMBean.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/MessagingServiceMBean.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface MessagingServiceMBean
+{   
+    public long getMessagingSerializerTaskCount();
+    public long getMessagingReceiverTaskCount();
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/ProtocolHeader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/ProtocolHeader.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/ProtocolHeader.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/ProtocolHeader.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class ProtocolHeader
+{
+    public static final String SERIALIZER = "SERIALIZER";
+    public static final String COMPRESSION = "COMPRESSION";
+    public static final String VERSION = "VERSION";
+    
+    public int serializerType_;
+    public boolean isCompressed_;
+    public boolean isStreamingMode_;
+    public boolean isListening_;
+    public int version_;
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/ResponseVerbHandler.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/ResponseVerbHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/ResponseVerbHandler.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ResponseVerbHandler implements IVerbHandler
+{
+    private static final Logger logger_ = Logger.getLogger( ResponseVerbHandler.class );
+    
+    public void doVerb(Message message)
+    {     
+        String messageId = message.getMessageId();        
+        IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
+        if ( cb != null )
+        {
+            logger_.info("Processing response on a callback from " + message.getFrom());
+            cb.response(message);
+        }
+        else
+        {            
+            AsyncResult ar = (AsyncResult)MessagingService.getAsyncResult(messageId);
+            if ( ar != null )
+            {
+                logger_.info("Processing response on an async result from " + message.getFrom());
+                ar.result(message.getMessageBody());
+            }
+        }
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/SelectionKeyHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/SelectionKeyHandler.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/SelectionKeyHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/SelectionKeyHandler.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.nio.channels.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SelectionKeyHandler 
+{
+    public void modifyKey(SelectionKey key)
+    {
+        throw new UnsupportedOperationException("modifyKey() cannot be called on " + getClass().getName() + "!");
+    }
+    
+    public void modifyKeyForRead(SelectionKey key)
+    {
+        throw new UnsupportedOperationException("modifyKeyForRead() cannot be called on " + getClass().getName() + "!");
+    }
+    
+    public void modifyKeyForWrite(SelectionKey key)
+    {
+        throw new UnsupportedOperationException("modifyKeyForWrite() cannot be called on " + getClass().getName() + "!");
+    }
+    
+    /**
+     * Method which is called when the key becomes acceptable.
+     *
+     * @param key The key which is acceptable.
+     */
+    public void accept(SelectionKey key)
+    {
+         throw new UnsupportedOperationException("accept() cannot be called on " + getClass().getName() + "!");
+    }
+    
+    /**
+     * Method which is called when the key becomes connectable.
+     *
+     * @param key The key which is connectable.
+     */
+    public void connect(SelectionKey key)
+    {
+        throw new UnsupportedOperationException("connect() cannot be called on " + getClass().getName() + "!");
+    }
+    
+    /**
+     * Method which is called when the key becomes readable.
+     *
+     * @param key The key which is readable.
+     */
+    public void read(SelectionKey key)
+    {
+        throw new UnsupportedOperationException("read() cannot be called on " + getClass().getName() + "!");
+    }
+    
+    /**
+     * Method which is called when the key becomes writable.
+     *
+     * @param key The key which is writable.
+     */
+    public void write(SelectionKey key)
+    {
+        throw new UnsupportedOperationException("write() cannot be called on " + getClass().getName() + "!");
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/SelectorManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/SelectorManager.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/SelectorManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/SelectorManager.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.TreeSet;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class SelectorManager extends Thread
+{
+    private static final Logger logger_ = Logger.getLogger(SelectorManager.class); 
+    // the underlying selector used
+    /**
+     * DESCRIBE THE FIELD
+     */
+    protected Selector selector_;   
+    protected HashSet<SelectionKey> modifyKeysForRead_;    
+    protected HashSet<SelectionKey> modifyKeysForWrite_;
+    
+    // the list of keys waiting to be cancelled
+    protected HashSet<SelectionKey> cancelledKeys_;    
+
+    // The static selector manager which is used by all applications
+    private static SelectorManager manager_;
+    
+    // The static UDP selector manager which is used by all applications
+    private static SelectorManager udpManager_;
+
+    /**
+     * Constructor, which is private since there is only one selector per JVM.
+     * 
+     * @param profile
+     *            DESCRIBE THE PARAMETER
+     */
+    protected SelectorManager(String name)
+    {
+        super(name);                        
+        this.modifyKeysForRead_ = new HashSet<SelectionKey>();
+        this.modifyKeysForWrite_ = new HashSet<SelectionKey>();
+        this.cancelledKeys_ = new HashSet<SelectionKey>();
+
+        // attempt to create selector
+        try
+        {
+            selector_ = Selector.open();
+        }
+        catch (IOException e)
+        {
+            logger_.error("SEVERE ERROR (SelectorManager): Error creating selector "
+                            + e);
+        }
+
+        setDaemon(false);
+        start();
+    }
+
+    /**
+     * Method which asks the Selector Manager to add the given key to the
+     * cancelled set. If noone calls register on this key during the rest of
+     * this select() operation, the key will be cancelled. Otherwise, it will be
+     * returned as a result of the register operation.
+     * 
+     * @param key
+     *            The key to cancel
+     */
+    public void cancel(SelectionKey key)
+    {
+        if (key == null)
+        {
+            throw new NullPointerException();
+        }
+
+        synchronized ( cancelledKeys_ )
+        {
+            cancelledKeys_.add(key);
+        }
+    }
+
+    /**
+     * Registers a new channel with the selector, and attaches the given
+     * SelectionKeyHandler as the handler for the newly created key. Operations
+     * which the hanlder is interested in will be called as available.
+     * 
+     * @param channel
+     *            The channel to regster with the selector
+     * @param handler
+     *            The handler to use for the callbacks
+     * @param ops
+     *            The initial interest operations
+     * @return The SelectionKey which uniquely identifies this channel
+     * @exception IOException
+     *                DESCRIBE THE EXCEPTION
+     */
+    public SelectionKey register(SelectableChannel channel,
+            SelectionKeyHandler handler, int ops) throws IOException
+    {
+        if ((channel == null) || (handler == null))
+        {
+            throw new NullPointerException();
+        }
+
+        selector_.wakeup();
+        SelectionKey key = channel.register(selector_, ops, handler);
+        synchronized(cancelledKeys_)
+        {
+            cancelledKeys_.remove(key);
+        }
+        selector_.wakeup();
+        return key;
+    }      
+    
+    public void modifyKeyForRead(SelectionKey key)
+    {
+        if (key == null)
+        {
+            throw new NullPointerException();
+        }
+
+        synchronized(modifyKeysForRead_)
+        {
+            modifyKeysForRead_.add(key);
+        }
+        selector_.wakeup();
+    }
+    
+    public void modifyKeyForWrite(SelectionKey key)
+    {
+        if (key == null)
+        {
+            throw new NullPointerException();
+        }
+
+        synchronized( modifyKeysForWrite_ )
+        {
+            modifyKeysForWrite_.add(key);
+        }
+        selector_.wakeup();
+    }
+
+    /**
+     * This method starts the socket manager listening for events. It is
+     * designed to be started when this thread's start() method is invoked.
+     */
+    public void run()
+    {
+        try
+        {              
+            // loop while waiting for activity
+            while (true && !Thread.currentThread().interrupted() )
+            { 
+                try
+                {
+                    doProcess();                                                                          
+                    selector_.select(1000); 
+                    synchronized( cancelledKeys_ )
+                    {
+                        if (cancelledKeys_.size() > 0)
+                        {
+                            SelectionKey[] keys = cancelledKeys_.toArray( new SelectionKey[0]);                        
+                            
+                            for ( SelectionKey key : keys )
+                            {
+                                key.cancel();
+                                key.channel().close();
+                            }                                                
+                            cancelledKeys_.clear();
+                        }
+                    }
+                }
+                catch ( IOException e )
+                {
+                    logger_.warn(LogUtil.throwableToString(e));
+                }
+            }
+                         
+            manager_ = null;
+        }
+        catch (Throwable t)
+        {
+            logger_.error("ERROR (SelectorManager.run): " + t);
+            logger_.error(LogUtil.throwableToString(t));
+            System.exit(-1);
+        }
+    }    
+
+    protected void doProcess() throws IOException
+    {
+        doInvocationsForRead();
+        doInvocationsForWrite();
+        doSelections();
+    }
+    
+    /**
+     * DESCRIBE THE METHOD
+     * 
+     * @exception IOException
+     *                DESCRIBE THE EXCEPTION
+     */
+    protected void doSelections() throws IOException
+    {
+        SelectionKey[] keys = selectedKeys();
+
+        for (int i = 0; i < keys.length; i++)
+        {
+            selector_.selectedKeys().remove(keys[i]);
+
+            synchronized (keys[i])
+            {
+                SelectionKeyHandler skh = (SelectionKeyHandler) keys[i]
+                        .attachment();
+
+                if (skh != null)
+                {
+                    // accept
+                    if (keys[i].isValid() && keys[i].isAcceptable())
+                    {
+                        skh.accept(keys[i]);
+                    }
+
+                    // connect
+                    if (keys[i].isValid() && keys[i].isConnectable())
+                    {
+                        skh.connect(keys[i]);
+                    }
+
+                    // read
+                    if (keys[i].isValid() && keys[i].isReadable())
+                    {
+                        skh.read(keys[i]);
+                    }
+
+                    // write
+                    if (keys[i].isValid() && keys[i].isWritable())
+                    {
+                        skh.write(keys[i]);
+                    }
+                }
+                else
+                {
+                    keys[i].channel().close();
+                    keys[i].cancel();
+                }
+            }
+        }
+    }
+    
+    private void doInvocationsForRead()
+    {
+        Iterator<SelectionKey> it;
+        synchronized (modifyKeysForRead_)
+        {
+            it = new ArrayList<SelectionKey>(modifyKeysForRead_).iterator();
+            modifyKeysForRead_.clear();
+        }
+
+        while (it.hasNext())
+        {
+            SelectionKey key = it.next();
+            if (key.isValid() && (key.attachment() != null))
+            {
+                ((SelectionKeyHandler) key.attachment()).modifyKeyForRead(key);
+            }
+        }
+    }
+    
+    private void doInvocationsForWrite()
+    {
+        Iterator<SelectionKey> it;
+        synchronized (modifyKeysForWrite_)
+        {
+            it = new ArrayList<SelectionKey>(modifyKeysForWrite_).iterator();
+            modifyKeysForWrite_.clear();
+        }
+
+        while (it.hasNext())
+        {
+            SelectionKey key = it.next();
+            if (key.isValid() && (key.attachment() != null))
+            {
+                ((SelectionKeyHandler) key.attachment()).modifyKeyForWrite(key);
+            }
+        }
+    }
+
+    /**
+     * Selects all of the currenlty selected keys on the selector and returns
+     * the result as an array of keys.
+     * 
+     * @return The array of keys
+     * @exception IOException
+     *                DESCRIBE THE EXCEPTION
+     */
+    protected SelectionKey[] selectedKeys() throws IOException
+    {
+        return (SelectionKey[]) selector_.selectedKeys().toArray(
+                new SelectionKey[0]);
+    }
+    
+    /**
+     * Returns the SelectorManager applications should use.
+     * 
+     * @return The SelectorManager which applications should use
+     */
+    public static SelectorManager getSelectorManager()
+    {
+        synchronized (SelectorManager.class)
+        {
+            if (manager_ == null)
+            {
+                manager_ = new SelectorManager("TCP Selector Manager");
+            }            
+        }
+        return manager_;
+    }
+    
+    public static SelectorManager getUdpSelectorManager()
+    {
+        synchronized (SelectorManager.class)
+        {
+            if (udpManager_ == null)
+            {
+                udpManager_ = new SelectorManager("UDP Selector Manager");
+            }            
+        }
+        return udpManager_;
+    }
+    
+    /**
+     * Returns whether or not this thread of execution is the selector thread
+     * 
+     * @return Whether or not this is the selector thread
+     */
+    public static boolean isSelectorThread()
+    {
+        return (Thread.currentThread() == manager_);
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/TcpConnection.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/TcpConnection.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/TcpConnection.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,570 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.io.FastSerializer;
+import org.apache.cassandra.net.io.ISerializer;
+import org.apache.cassandra.net.io.ProtocolState;
+import org.apache.cassandra.net.io.StartState;
+import org.apache.cassandra.net.io.TcpReader;
+import org.apache.cassandra.net.io.TcpReader.TcpReaderState;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.net.io.*;
+import org.apache.cassandra.net.sink.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TcpConnection extends SelectionKeyHandler implements Comparable
+{
+    // logging and profiling.
+    private static Logger logger_ = Logger.getLogger(TcpConnection.class);  
+    private static ISerializer serializer_ = new FastSerializer();
+    private SocketChannel socketChannel_;
+    private SelectionKey key_;
+    private TcpConnectionManager pool_;
+    private boolean isIncoming_ = false;       
+    private TcpReader tcpReader_;    
+    private ReadWorkItem readWork_ = new ReadWorkItem(); 
+    private List<ByteBuffer> pendingWrites_ = new Vector<ByteBuffer>();  
+    private AtomicBoolean connected_ = new AtomicBoolean(false);    
+    private EndPoint localEp_;
+    private EndPoint remoteEp_;
+    boolean inUse_ = false;
+         
+    /* 
+     * Added for streaming support. We need the boolean
+     * to indicate that this connection is used for 
+     * streaming. The Condition and the Lock are used
+     * to signal the stream() that it can continue
+     * streaming when the socket becomes writable.
+    */
+    private boolean bStream_ = false;
+    private Lock lock_;
+    private Condition condition_;
+    
+    // used from getConnection - outgoing
+    TcpConnection(TcpConnectionManager pool, EndPoint from, EndPoint to) throws IOException
+    {          
+        socketChannel_ = SocketChannel.open();            
+        socketChannel_.configureBlocking(false);        
+        pool_ = pool;
+        
+        localEp_ = from;
+        remoteEp_ = to;
+        
+        if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
+        {
+            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
+        }
+        else
+        {
+            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+            connected_.set(true);     
+        }         
+    }
+    
+    /*
+     * Used for streaming purposes has no pooling semantics.
+    */
+    TcpConnection(EndPoint from, EndPoint to) throws IOException
+    {
+        socketChannel_ = SocketChannel.open();               
+        socketChannel_.configureBlocking(false);       
+        
+        localEp_ = from;
+        remoteEp_ = to;
+        
+        if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
+        {
+            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
+        }
+        else
+        {
+            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+            connected_.set(true);     
+        }        
+        bStream_ = true;
+        lock_ = new ReentrantLock();
+        condition_ = lock_.newCondition();
+    }
+    
+    /*
+     * This method is invoked by the TcpConnectionHandler to accept incoming TCP connections.
+     * Accept the connection and then register interest for reads.
+    */
+    static void acceptConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
+    {
+        TcpConnection tcpConnection = new TcpConnection(socketChannel, localEp, true);
+        tcpConnection.registerReadInterest();
+    }
+    
+    private void registerReadInterest() throws IOException
+    {
+        key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+    }
+    
+    // used for incoming connections
+    TcpConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
+    {       
+        socketChannel_ = socketChannel;
+        socketChannel_.configureBlocking(false);                           
+        isIncoming_ = isIncoming;
+        connected_.set(true);       
+        localEp_ = localEp;           
+    }
+    
+    EndPoint getLocalEp()
+    {
+        return localEp_;
+    }
+    
+    public void setLocalEp(EndPoint localEp)
+    {
+        localEp_ = localEp;
+    }
+
+    public EndPoint getEndPoint() 
+    {
+        return remoteEp_;
+    }
+    
+    public boolean isIncoming()
+    {
+        return isIncoming_;
+    }    
+    
+    public SocketChannel getSocketChannel()
+    {
+        return socketChannel_;
+    }    
+    
+    public void write(Message message) throws IOException
+    {           
+        byte[] data = serializer_.serialize(message);        
+        if ( data.length > 0 )
+        {    
+            boolean listening = ( message.getFrom().equals(EndPoint.randomLocalEndPoint_) ) ? false : true;
+            ByteBuffer buffer = MessagingService.packIt( data , false, false, listening);   
+            synchronized(this)
+            {
+                if (!pendingWrites_.isEmpty() || !connected_.get()) 
+                {                     
+                    pendingWrites_.add(buffer);                
+                    return;
+                }
+                
+                logger_.debug("Sending packets of size " + data.length);            
+                socketChannel_.write(buffer);                
+                
+                if (buffer.remaining() > 0) 
+                {                   
+                    pendingWrites_.add(buffer);
+                    if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+                    {                                    
+                        SelectorManager.getSelectorManager().modifyKeyForWrite(key_);                     
+                    }
+                }
+            }
+        }
+    }
+    
+    public void stream(File file, long startPosition, long endPosition) throws IOException
+    {
+        if ( !bStream_ )
+            throw new IllegalStateException("Cannot stream since we are not set up to stream data.");
+                
+        lock_.lock();        
+        try
+        {            
+            /* transfer 64MB in each attempt */
+            int limit = 64*1024*1024;  
+            long total = endPosition - startPosition;
+            /* keeps track of total number of bytes transferred */
+            long bytesWritten = 0L;                          
+            RandomAccessFile raf = new RandomAccessFile(file, "r");            
+            FileChannel fc = raf.getChannel();            
+            
+            /* 
+             * If the connection is not yet established then wait for
+             * the timeout period of 2 seconds. Attempt to reconnect 3 times and then 
+             * bail with an IOException.
+            */
+            long waitTime = 2;
+            int retry = 0;
+            while ( !connected_.get() )
+            {
+                if ( retry == 3 )
+                    throw new IOException("Unable to connect to " + remoteEp_ + " after " + retry + " attempts.");
+                waitToContinueStreaming(waitTime, TimeUnit.SECONDS);
+                ++retry;
+            }
+            
+            while ( bytesWritten < total )
+            {                                
+                if ( startPosition == 0 )
+                {
+                    ByteBuffer buffer = MessagingService.constructStreamHeader(false, true);                      
+                    socketChannel_.write(buffer);
+                    handleIncompleteWrite(buffer);
+                }
+                
+                /* returns the number of bytes transferred from file to the socket */
+                long bytesTransferred = fc.transferTo(startPosition, limit, socketChannel_);
+                logger_.debug("Bytes transferred " + bytesTransferred);                
+                bytesWritten += bytesTransferred;
+                startPosition += bytesTransferred; 
+                /*
+                 * If the number of bytes transferred is less than intended 
+                 * then we need to wait till socket becomes writeable again. 
+                */
+                if ( bytesTransferred < limit && bytesWritten != total )
+                {                    
+                    if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+                    {                    
+                        SelectorManager.getSelectorManager().modifyKeyForWrite(key_);                     
+                    }
+                    waitToContinueStreaming();
+                }
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }        
+    }
+    
+    private void handleIncompleteWrite(ByteBuffer buffer)
+    {
+        if (buffer.remaining() > 0) 
+        {            
+            pendingWrites_.add(buffer);
+            if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+            {                    
+                SelectorManager.getSelectorManager().modifyKeyForWrite(key_);                     
+            }
+            waitToContinueStreaming();
+        }     
+    }
+    
+    private void waitToContinueStreaming()
+    {
+        try
+        {
+            condition_.await();
+        }
+        catch ( InterruptedException ex )
+        {
+            logger_.warn( LogUtil.throwableToString(ex) );
+        }
+    }
+    
+    private void waitToContinueStreaming(long waitTime, TimeUnit tu)
+    {
+        try
+        {
+            condition_.await(waitTime, tu);
+        }
+        catch ( InterruptedException ex )
+        {
+            logger_.warn( LogUtil.throwableToString(ex) );
+        }
+    }
+    
+    private void resumeStreaming()
+    {
+        /* if not in streaming mode do nothing */
+        if ( !bStream_ )
+            return;
+        
+        lock_.lock();
+        try
+        {
+            condition_.signal();
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+    }
+    
+    public void close()
+    {
+        inUse_ = false;
+        if ( pool_.contains(this) )
+            pool_.decUsed();               
+    }
+
+    public boolean isConnected()
+    {
+        return socketChannel_.isConnected();
+    }
+    
+    public boolean equals(Object o)
+    {
+        if ( !(o instanceof TcpConnection) )
+            return false;
+        
+        TcpConnection rhs = (TcpConnection)o;        
+        if ( localEp_.equals(rhs.localEp_) && remoteEp_.equals(rhs.remoteEp_) )
+            return true;
+        else
+            return false;
+    }
+    
+    public int hashCode()
+    {
+        return (localEp_ + ":" + remoteEp_).hashCode();
+    }
+
+    public String toString()
+    {        
+        return socketChannel_.toString();
+    }
+    
+    void closeSocket()
+    {
+        logger_.warn("Closing down connection " + socketChannel_ + " with " + pendingWrites_.size() + " writes remaining.");            
+        if ( pool_ != null )
+        {
+            pool_.removeConnection(this);
+        }
+        cancel(key_);
+        pendingWrites_.clear();
+    }
+    
+    void errorClose() 
+    {        
+        logger_.warn("Closing down connection " + socketChannel_);
+        pendingWrites_.clear();
+        cancel(key_);
+        pendingWrites_.clear();        
+        if ( pool_ != null )
+        {
+            pool_.removeConnection(this);            
+        }
+    }
+    
+    private void cancel(SelectionKey key)
+    {
+        if ( key != null )
+            SelectorManager.getSelectorManager().cancel(key);
+    }
+    
+    // called in the selector thread
+    public void connect(SelectionKey key)
+    {       
+        key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));        
+        try
+        {
+            if (socketChannel_.finishConnect())
+            {                                
+                SelectorManager.getSelectorManager().modifyKeyForRead(key);
+                connected_.set(true);                
+                
+                // this will flush the pending                
+                if (!pendingWrites_.isEmpty()) 
+                {                    
+                    SelectorManager.getSelectorManager().modifyKeyForWrite(key_);  
+                } 
+                resumeStreaming();
+            } 
+            else 
+            {  
+                logger_.warn("Closing connection because socket channel could not finishConnect.");;
+                errorClose();
+            }
+        } 
+        catch(IOException e) 
+        {               
+            logger_.warn("Encountered IOException on connection: "  + socketChannel_);
+            logger_.warn( LogUtil.throwableToString(e) );
+            errorClose();
+        }
+    }
+    
+    // called in the selector thread
+    public void write(SelectionKey key)
+    {   
+        key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );                
+        doPendingWrites();
+        /*
+         * This is executed only if we are in streaming mode.
+         * Idea is that we read a chunk of data from a source
+         * and wait to read the next from the source until we 
+         * are siganlled to do so from here. 
+        */
+         resumeStreaming();        
+    }
+    
+    void doPendingWrites()
+    {
+        try
+        {                     
+            while(!pendingWrites_.isEmpty()) 
+            {
+                ByteBuffer buffer = pendingWrites_.get(0);
+                socketChannel_.write(buffer);                    
+                if (buffer.remaining() > 0) 
+                {   
+                    break;
+                }               
+                pendingWrites_.remove(0);                    
+            } 
+            
+        }
+        catch(IOException ex)
+        {
+            logger_.warn(LogUtil.throwableToString(ex));
+            // This is to fix the wierd Linux bug with NIO.
+            errorClose();
+        }
+        finally
+        {    
+            synchronized(this)
+            {
+                if (!pendingWrites_.isEmpty() && (key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+                {                    
+                    SelectorManager.getSelectorManager().modifyKeyForWrite(key_); 
+                }  
+            }
+        }
+    }
+    
+    // called in the selector thread
+    public void read(SelectionKey key)
+    {          
+        key.interestOps( key.interestOps() & ( ~SelectionKey.OP_READ ) );         
+        // publish this event onto to the TCPReadEvent Queue.
+        MessagingService.getReadExecutor().execute(readWork_);
+    }
+    
+    public void modifyKeyForRead(SelectionKey key)
+    {
+        key.interestOps( key_.interestOps() | SelectionKey.OP_READ );
+    }
+    
+    public void modifyKeyForWrite(SelectionKey key)
+    {        
+        key.interestOps( key_.interestOps() | SelectionKey.OP_WRITE );
+    }
+    
+    class ReadWorkItem implements Runnable
+    {                 
+        // called from the TCP READ thread pool
+        public void run()
+        {                         
+            if ( tcpReader_ == null )
+            {
+                tcpReader_ = new TcpReader(TcpConnection.this);    
+                StartState nextState = tcpReader_.getSocketState(TcpReader.TcpReaderState.PREAMBLE);
+                if ( nextState == null )
+                {
+                    nextState = new ProtocolState(tcpReader_);
+                    tcpReader_.putSocketState(TcpReader.TcpReaderState.PREAMBLE, nextState);
+                }
+                tcpReader_.morphState(nextState);
+            }
+            
+            try
+            {           
+                byte[] bytes = new byte[0];
+                while ( (bytes = tcpReader_.read()).length > 0 )
+                {                       
+                    ProtocolHeader pH = tcpReader_.getProtocolHeader();                    
+                    if ( !pH.isStreamingMode_ )
+                    {
+                        /* first message received */
+                        if (remoteEp_ == null)
+                        {             
+                            int port = ( pH.isListening_ ) ? DatabaseDescriptor.getStoragePort() : EndPoint.randomPort_;
+                            remoteEp_ = new EndPoint( socketChannel_.socket().getInetAddress().getHostName(), port );                            
+                            // put connection into pool if possible
+                            pool_ = MessagingService.getConnectionPool(localEp_, remoteEp_);                            
+                            pool_.addToPool(TcpConnection.this);                            
+                        }
+                        
+                        /* Deserialize and handle the message */
+                        MessagingService.getDeserilizationExecutor().submit( new MessageDeserializationTask(pH.serializerType_, bytes) );                                                  
+                        tcpReader_.resetState();
+                    }
+                    else
+                    {
+                        MessagingService.setStreamingMode(false);
+                        /* Close this socket connection  used for streaming */
+                        closeSocket();
+                    }                    
+                }
+            }
+            catch ( IOException ex )
+            {                   
+                handleException(ex);
+            }
+            catch ( Throwable th )
+            {
+                handleException(th);
+            }
+            finally
+            {                                     
+                SelectorManager.getSelectorManager().modifyKeyForRead(key_);                
+            }
+        }
+        
+        private void handleException(Throwable th)
+        {
+            logger_.warn("Problem reading from socket connected to : " + socketChannel_);
+            logger_.warn(LogUtil.throwableToString(th));
+            // This is to fix the wierd Linux bug with NIO.
+            errorClose();
+        }
+    }
+    
+    public int pending()
+    {
+        return pendingWrites_.size();
+    }
+    
+    public int compareTo(Object o)
+    {
+        if (o instanceof TcpConnection) 
+        {
+            return pendingWrites_.size() - ((TcpConnection) o).pendingWrites_.size();            
+        }
+                    
+        throw new IllegalArgumentException();
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionHandler.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionHandler.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionHandler.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.nio.channels.*;
+import java.io.IOException;
+import java.net.*;
+
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TcpConnectionHandler extends SelectionKeyHandler
+{
+    private static Logger logger_ = Logger.getLogger(TcpConnectionHandler.class);
+    EndPoint localEp_;    
+    
+    public TcpConnectionHandler(EndPoint localEp) 
+    {
+        localEp_ = localEp;
+    }
+
+    public void accept(SelectionKey key)
+    {
+        try 
+        {            
+            ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
+            SocketChannel client = serverChannel.accept();
+            
+            if ( client != null )
+            {                        
+                //new TcpConnection(client, localEp_, true);
+                TcpConnection.acceptConnection(client, localEp_, true);                
+            }            
+        } 
+        catch(IOException e) 
+        {
+            logger_.warn(LogUtil.throwableToString(e));
+        }
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionManager.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/TcpConnectionManager.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+import org.apache.log4j.Logger;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class TcpConnectionManager
+{
+    private Lock lock_ = new ReentrantLock();
+    private List<TcpConnection> allConnections_;
+    private EndPoint localEp_;
+    private EndPoint remoteEp_;
+    private int initialSize_;
+    private int growthFactor_;
+    private int maxSize_;
+    private long lastTimeUsed_;
+    private boolean isShut_;
+    
+    private int inUse_;
+
+    TcpConnectionManager(int initialSize, int growthFactor, int maxSize, EndPoint localEp, EndPoint remoteEp)
+    {
+        initialSize_ = initialSize;
+        growthFactor_ = growthFactor;
+        maxSize_ = maxSize;
+        localEp_ = localEp;
+        remoteEp_ = remoteEp;     
+        isShut_ = false;                
+        lastTimeUsed_ = System.currentTimeMillis();        
+        allConnections_ = new Vector<TcpConnection>(); 
+    }
+    
+    TcpConnection getConnection() throws IOException
+    {
+        lock_.lock();
+        try
+        {
+            if (allConnections_.isEmpty()) 
+            {                
+                TcpConnection conn = new TcpConnection(this, localEp_, remoteEp_);
+                addToPool(conn);
+                conn.inUse_ = true;
+                incUsed();
+                return conn;
+            }
+            
+            TcpConnection least = getLeastLoaded();
+            
+            if ( (least != null && least.pending() == 0) || allConnections_.size() == maxSize_) {
+                least.inUse_ = true;
+                incUsed();
+                return least;
+            }
+                                    
+            TcpConnection connection = new TcpConnection(this, localEp_, remoteEp_);
+            if ( connection != null && !contains(connection) )
+            {
+                addToPool(connection);
+                connection.inUse_ = true;
+                incUsed();
+                return connection;
+            }
+            else
+            {
+                if ( connection != null )
+                {                
+                    connection.closeSocket();
+                }
+                return getLeastLoaded();
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+    }
+    
+    protected TcpConnection getLeastLoaded() 
+    {  
+        TcpConnection connection = null;
+        lock_.lock();
+        try
+        {
+            Collections.sort(allConnections_);
+            connection = (allConnections_.size() > 0 ) ? allConnections_.get(0) : null;
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+        return connection;
+    }
+    
+    void removeConnection(TcpConnection connection)
+    {
+        allConnections_.remove(connection);        
+    }
+    
+    void incUsed()
+    {
+        inUse_++;
+    }
+    
+    void decUsed()
+    {        
+        inUse_--;
+    }
+    
+    int getConnectionsInUse()
+    {
+        return inUse_;
+    }
+
+    void addToPool(TcpConnection connection)
+    { 
+        
+        if ( contains(connection) )
+            return;
+        
+        lock_.lock();
+        try
+        {
+            if ( allConnections_.size() < maxSize_ )
+            {                 
+                allConnections_.add(connection);                
+            }
+            else
+            {                
+                connection.closeSocket();
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+    }
+    
+    void shutdown()
+    {    
+        lock_.lock();
+        try
+        {
+            while ( allConnections_.size() > 0 )
+            {
+                TcpConnection connection = allConnections_.remove(0);                        
+                connection.closeSocket();
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+        isShut_ = true;
+    }
+
+    int getPoolSize()
+    {
+        return allConnections_.size();
+    }
+
+    EndPoint getLocalEndPoint()
+    {
+        return localEp_;
+    }
+    
+    EndPoint getRemoteEndPoint()
+    {
+        return remoteEp_;
+    }
+    
+    int getPendingWrites()
+    {
+        int total = 0;
+        lock_.lock();
+        try
+        {
+            for ( TcpConnection connection : allConnections_ )
+            {
+                total += connection.pending();
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+        return total;
+    }
+    
+    boolean contains(TcpConnection connection)
+    {
+        return allConnections_.contains(connection);
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/UdpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/UdpConnection.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/UdpConnection.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/UdpConnection.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.net.SocketAddress;
+import java.nio.*;
+import java.nio.channels.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.net.io.ProtocolState;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class UdpConnection extends SelectionKeyHandler
+{
+    private static Logger logger_ = Logger.getLogger(UdpConnection.class);
+    private static final int BUFFER_SIZE = 4096;
+    private static final int protocol_ = 0xBADBEEF;
+    
+    private DatagramChannel socketChannel_;
+    private SelectionKey key_;    
+    private EndPoint localEndPoint_;
+    
+    public void init() throws IOException
+    {
+        socketChannel_ = DatagramChannel.open();
+        socketChannel_.socket().setReuseAddress(true);
+        socketChannel_.configureBlocking(false);        
+    }
+    
+    public void init(int port) throws IOException
+    {
+        // TODO: get TCP port from config and add one.
+        localEndPoint_ = new EndPoint(port);
+        socketChannel_ = DatagramChannel.open();
+        socketChannel_.socket().bind(localEndPoint_.getInetAddress());
+        socketChannel_.socket().setReuseAddress(true);
+        socketChannel_.configureBlocking(false);        
+        key_ = SelectorManager.getUdpSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+    }
+    
+    public boolean write(Message message, EndPoint to) throws IOException
+    {
+        boolean bVal = true;                       
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        Message.serializer().serialize(message, dos);        
+        byte[] data = bos.toByteArray();
+        if ( data.length > 0 )
+        {  
+            logger_.debug("Size of Gossip packet " + data.length);
+            byte[] protocol = BasicUtilities.intToByteArray(protocol_);
+            ByteBuffer buffer = ByteBuffer.allocate(data.length + protocol.length);
+            buffer.put( protocol );
+            buffer.put(data);
+            buffer.flip();
+            
+            int n  = socketChannel_.send(buffer, to.getInetAddress());
+            if ( n == 0 )
+            {
+                bVal = false;
+            }
+        }
+        return bVal;
+    }
+    
+    void close()
+    {
+        try
+        {
+            if ( socketChannel_ != null )
+                socketChannel_.close();
+        }
+        catch ( IOException ex )
+        {
+            logger_.error( LogUtil.throwableToString(ex) );
+        }
+    }
+    
+    public DatagramChannel getDatagramChannel()
+    {
+        return socketChannel_;
+    }
+    
+    private byte[] gobbleHeaderAndExtractBody(ByteBuffer buffer)
+    {
+        byte[] body = new byte[0];        
+        byte[] protocol = new byte[4];
+        buffer = buffer.get(protocol, 0, protocol.length);
+        int value = BasicUtilities.byteArrayToInt(protocol);
+        
+        if ( protocol_ != value )
+        {
+            logger_.info("Invalid protocol header in the incoming message " + value);
+            return body;
+        }
+        body = new byte[buffer.remaining()];
+        buffer.get(body, 0, body.length);       
+        return body;
+    }
+    
+    public void read(SelectionKey key)
+    {        
+        key.interestOps( key.interestOps() & (~SelectionKey.OP_READ) );
+        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+        try
+        {
+            SocketAddress sa = socketChannel_.receive(buffer);
+            if ( sa == null )
+            {
+                logger_.debug("*** No datagram packet was available to be read ***");
+                return;
+            }            
+            buffer.flip();
+            
+            byte[] bytes = gobbleHeaderAndExtractBody(buffer);
+            if ( bytes.length > 0 )
+            {
+                DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+                Message message = Message.serializer().deserialize(dis);                
+                if ( message != null )
+                {                                        
+                    MessagingService.receive(message);
+                }
+            }
+        }
+        catch ( IOException ioe )
+        {
+            logger_.warn(LogUtil.throwableToString(ioe));
+        }
+        finally
+        {
+            key.interestOps( key_.interestOps() | SelectionKey.OP_READ );
+        }
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/net/http/ColumnFamilyFormatter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/net/http/ColumnFamilyFormatter.java?rev=749207&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/net/http/ColumnFamilyFormatter.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/net/http/ColumnFamilyFormatter.java Mon Mar  2 06:13:14 2009
@@ -0,0 +1,94 @@
+package org.apache.cassandra.net.http;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.SuperColumn;
+
+
+public class ColumnFamilyFormatter extends HTMLFormatter
+{
+	public ColumnFamilyFormatter()
+	{
+		super();
+	}
+
+	public ColumnFamilyFormatter(StringBuilder sb)
+	{
+		super(sb);
+	}
+
+    public void printKeyColumnFamily(StringBuilder sb, String sKey, ColumnFamily cf)
+    {
+    	// print the key
+		sb.append("Key = " + sKey + "<br>");
+
+		// print the column familie
+		printColumnFamily(sb, cf);
+    }
+
+    public void printColumnFamily(StringBuilder sb, ColumnFamily cf)
+    {
+    	// first print the column family specific data
+    	sb.append("ColumnFamily = " + cf.name() + "<br>");
+
+    	String columnFamilyType = DatabaseDescriptor.getColumnType(cf.name());
+    	Collection<IColumn> cols = cf.getAllColumns();
+    	if("Super".equals(columnFamilyType))
+    	{
+    		printSuperColumns(sb, cols);
+    	}
+    	else
+    	{
+    		printSimpleColumns(sb, cols);
+    	}
+    }
+
+    public void printSuperColumns(StringBuilder sb, Collection<IColumn> cols)
+    {
+		// print the super column summary
+		sb.append("Number of super columns = " + cols.size() + "<br>");
+
+		startTable();
+		for(IColumn col : cols)
+		{
+	        addHeader(col.name());
+			startRow();
+			Collection<IColumn> simpleCols = ((SuperColumn)col).getSubColumns();
+			printSimpleColumns(sb, simpleCols);
+			endRow();
+		}
+		endTable();
+    }
+
+    public void printSimpleColumns(StringBuilder sb, Collection<IColumn> cols)
+    {
+		int numColumns = cols.size();
+		String[] columnNames = new String[numColumns];
+		String[] columnValues = new String[numColumns];
+
+		// print the simple column summary
+		//sb.append("Number of simple columns = " + cols.size() + "<br>");
+
+		int i = 0;
+		for(IColumn col : cols)
+    	{
+			columnNames[i] = col.name();
+			columnValues[i] = new String(col.value());
+			++i;
+    	}
+
+		startTable();
+        addHeaders(columnNames);
+		startRow();
+		for(i = 0; i <  numColumns; ++i)
+		{
+			addCol(columnValues[i]);
+		}
+		endRow();
+		endTable();
+    }
+}