You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC

svn commit: r749218 [24/34] - in /incubator/cassandra: branches/ dist/ nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/ trunk/src/org/apache/ trunk/src/org/apache/cassandra/ trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnection.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,570 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.*;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.io.FastSerializer;
+import org.apache.cassandra.net.io.ISerializer;
+import org.apache.cassandra.net.io.ProtocolState;
+import org.apache.cassandra.net.io.StartState;
+import org.apache.cassandra.net.io.TcpReader;
+import org.apache.cassandra.net.io.TcpReader.TcpReaderState;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.net.io.*;
+import org.apache.cassandra.net.sink.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TcpConnection extends SelectionKeyHandler implements Comparable
+{
+    // logging and profiling.
+    private static Logger logger_ = Logger.getLogger(TcpConnection.class);  
+    private static ISerializer serializer_ = new FastSerializer();
+    private SocketChannel socketChannel_;
+    private SelectionKey key_;
+    private TcpConnectionManager pool_;
+    private boolean isIncoming_ = false;       
+    private TcpReader tcpReader_;    
+    private ReadWorkItem readWork_ = new ReadWorkItem(); 
+    private List<ByteBuffer> pendingWrites_ = new Vector<ByteBuffer>();  
+    private AtomicBoolean connected_ = new AtomicBoolean(false);    
+    private EndPoint localEp_;
+    private EndPoint remoteEp_;
+    boolean inUse_ = false;
+         
+    /* 
+     * Added for streaming support. We need the boolean
+     * to indicate that this connection is used for 
+     * streaming. The Condition and the Lock are used
+     * to signal the stream() that it can continue
+     * streaming when the socket becomes writable.
+    */
+    private boolean bStream_ = false;
+    private Lock lock_;
+    private Condition condition_;
+    
+    // used from getConnection - outgoing
+    TcpConnection(TcpConnectionManager pool, EndPoint from, EndPoint to) throws IOException
+    {          
+        socketChannel_ = SocketChannel.open();            
+        socketChannel_.configureBlocking(false);        
+        pool_ = pool;
+        
+        localEp_ = from;
+        remoteEp_ = to;
+        
+        if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
+        {
+            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
+        }
+        else
+        {
+            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+            connected_.set(true);     
+        }         
+    }
+    
+    /*
+     * Used for streaming purposes has no pooling semantics.
+    */
+    TcpConnection(EndPoint from, EndPoint to) throws IOException
+    {
+        socketChannel_ = SocketChannel.open();               
+        socketChannel_.configureBlocking(false);       
+        
+        localEp_ = from;
+        remoteEp_ = to;
+        
+        if ( !socketChannel_.connect( remoteEp_.getInetAddress() ) )
+        {
+            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_CONNECT);
+        }
+        else
+        {
+            key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+            connected_.set(true);     
+        }        
+        bStream_ = true;
+        lock_ = new ReentrantLock();
+        condition_ = lock_.newCondition();
+    }
+    
+    /*
+     * This method is invoked by the TcpConnectionHandler to accept incoming TCP connections.
+     * Accept the connection and then register interest for reads.
+    */
+    static void acceptConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
+    {
+        TcpConnection tcpConnection = new TcpConnection(socketChannel, localEp, true);
+        tcpConnection.registerReadInterest();
+    }
+    
+    private void registerReadInterest() throws IOException
+    {
+        key_ = SelectorManager.getSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+    }
+    
+    // used for incoming connections
+    TcpConnection(SocketChannel socketChannel, EndPoint localEp, boolean isIncoming) throws IOException
+    {       
+        socketChannel_ = socketChannel;
+        socketChannel_.configureBlocking(false);                           
+        isIncoming_ = isIncoming;
+        connected_.set(true);       
+        localEp_ = localEp;           
+    }
+    
+    EndPoint getLocalEp()
+    {
+        return localEp_;
+    }
+    
+    public void setLocalEp(EndPoint localEp)
+    {
+        localEp_ = localEp;
+    }
+
+    public EndPoint getEndPoint() 
+    {
+        return remoteEp_;
+    }
+    
+    public boolean isIncoming()
+    {
+        return isIncoming_;
+    }    
+    
+    public SocketChannel getSocketChannel()
+    {
+        return socketChannel_;
+    }    
+    
+    public void write(Message message) throws IOException
+    {           
+        byte[] data = serializer_.serialize(message);        
+        if ( data.length > 0 )
+        {    
+            boolean listening = ( message.getFrom().equals(EndPoint.randomLocalEndPoint_) ) ? false : true;
+            ByteBuffer buffer = MessagingService.packIt( data , false, false, listening);   
+            synchronized(this)
+            {
+                if (!pendingWrites_.isEmpty() || !connected_.get()) 
+                {                     
+                    pendingWrites_.add(buffer);                
+                    return;
+                }
+                
+                logger_.debug("Sending packets of size " + data.length);            
+                socketChannel_.write(buffer);                
+                
+                if (buffer.remaining() > 0) 
+                {                   
+                    pendingWrites_.add(buffer);
+                    if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+                    {                                    
+                        SelectorManager.getSelectorManager().modifyKeyForWrite(key_);                     
+                    }
+                }
+            }
+        }
+    }
+    
+    public void stream(File file, long startPosition, long endPosition) throws IOException
+    {
+        if ( !bStream_ )
+            throw new IllegalStateException("Cannot stream since we are not set up to stream data.");
+                
+        lock_.lock();        
+        try
+        {            
+            /* transfer 64MB in each attempt */
+            int limit = 64*1024*1024;  
+            long total = endPosition - startPosition;
+            /* keeps track of total number of bytes transferred */
+            long bytesWritten = 0L;                          
+            RandomAccessFile raf = new RandomAccessFile(file, "r");            
+            FileChannel fc = raf.getChannel();            
+            
+            /* 
+             * If the connection is not yet established then wait for
+             * the timeout period of 2 seconds. Attempt to reconnect 3 times and then 
+             * bail with an IOException.
+            */
+            long waitTime = 2;
+            int retry = 0;
+            while ( !connected_.get() )
+            {
+                if ( retry == 3 )
+                    throw new IOException("Unable to connect to " + remoteEp_ + " after " + retry + " attempts.");
+                waitToContinueStreaming(waitTime, TimeUnit.SECONDS);
+                ++retry;
+            }
+            
+            while ( bytesWritten < total )
+            {                                
+                if ( startPosition == 0 )
+                {
+                    ByteBuffer buffer = MessagingService.constructStreamHeader(false, true);                      
+                    socketChannel_.write(buffer);
+                    handleIncompleteWrite(buffer);
+                }
+                
+                /* returns the number of bytes transferred from file to the socket */
+                long bytesTransferred = fc.transferTo(startPosition, limit, socketChannel_);
+                logger_.debug("Bytes transferred " + bytesTransferred);                
+                bytesWritten += bytesTransferred;
+                startPosition += bytesTransferred; 
+                /*
+                 * If the number of bytes transferred is less than intended 
+                 * then we need to wait till socket becomes writeable again. 
+                */
+                if ( bytesTransferred < limit && bytesWritten != total )
+                {                    
+                    if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+                    {                    
+                        SelectorManager.getSelectorManager().modifyKeyForWrite(key_);                     
+                    }
+                    waitToContinueStreaming();
+                }
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }        
+    }
+    
+    private void handleIncompleteWrite(ByteBuffer buffer)
+    {
+        if (buffer.remaining() > 0) 
+        {            
+            pendingWrites_.add(buffer);
+            if ((key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+            {                    
+                SelectorManager.getSelectorManager().modifyKeyForWrite(key_);                     
+            }
+            waitToContinueStreaming();
+        }     
+    }
+    
+    private void waitToContinueStreaming()
+    {
+        try
+        {
+            condition_.await();
+        }
+        catch ( InterruptedException ex )
+        {
+            logger_.warn( LogUtil.throwableToString(ex) );
+        }
+    }
+    
+    private void waitToContinueStreaming(long waitTime, TimeUnit tu)
+    {
+        try
+        {
+            condition_.await(waitTime, tu);
+        }
+        catch ( InterruptedException ex )
+        {
+            logger_.warn( LogUtil.throwableToString(ex) );
+        }
+    }
+    
+    private void resumeStreaming()
+    {
+        /* if not in streaming mode do nothing */
+        if ( !bStream_ )
+            return;
+        
+        lock_.lock();
+        try
+        {
+            condition_.signal();
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+    }
+    
+    public void close()
+    {
+        inUse_ = false;
+        if ( pool_.contains(this) )
+            pool_.decUsed();               
+    }
+
+    public boolean isConnected()
+    {
+        return socketChannel_.isConnected();
+    }
+    
+    public boolean equals(Object o)
+    {
+        if ( !(o instanceof TcpConnection) )
+            return false;
+        
+        TcpConnection rhs = (TcpConnection)o;        
+        if ( localEp_.equals(rhs.localEp_) && remoteEp_.equals(rhs.remoteEp_) )
+            return true;
+        else
+            return false;
+    }
+    
+    public int hashCode()
+    {
+        return (localEp_ + ":" + remoteEp_).hashCode();
+    }
+
+    public String toString()
+    {        
+        return socketChannel_.toString();
+    }
+    
+    void closeSocket()
+    {
+        logger_.warn("Closing down connection " + socketChannel_ + " with " + pendingWrites_.size() + " writes remaining.");            
+        if ( pool_ != null )
+        {
+            pool_.removeConnection(this);
+        }
+        cancel(key_);
+        pendingWrites_.clear();
+    }
+    
+    void errorClose() 
+    {        
+        logger_.warn("Closing down connection " + socketChannel_);
+        pendingWrites_.clear();
+        cancel(key_);
+        pendingWrites_.clear();        
+        if ( pool_ != null )
+        {
+            pool_.removeConnection(this);            
+        }
+    }
+    
+    private void cancel(SelectionKey key)
+    {
+        if ( key != null )
+            SelectorManager.getSelectorManager().cancel(key);
+    }
+    
+    // called in the selector thread
+    public void connect(SelectionKey key)
+    {       
+        key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));        
+        try
+        {
+            if (socketChannel_.finishConnect())
+            {                                
+                SelectorManager.getSelectorManager().modifyKeyForRead(key);
+                connected_.set(true);                
+                
+                // this will flush the pending                
+                if (!pendingWrites_.isEmpty()) 
+                {                    
+                    SelectorManager.getSelectorManager().modifyKeyForWrite(key_);  
+                } 
+                resumeStreaming();
+            } 
+            else 
+            {  
+                logger_.warn("Closing connection because socket channel could not finishConnect.");;
+                errorClose();
+            }
+        } 
+        catch(IOException e) 
+        {               
+            logger_.warn("Encountered IOException on connection: "  + socketChannel_);
+            logger_.warn( LogUtil.throwableToString(e) );
+            errorClose();
+        }
+    }
+    
+    // called in the selector thread
+    public void write(SelectionKey key)
+    {   
+        key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );                
+        doPendingWrites();
+        /*
+         * This is executed only if we are in streaming mode.
+         * Idea is that we read a chunk of data from a source
+         * and wait to read the next from the source until we 
+         * are siganlled to do so from here. 
+        */
+         resumeStreaming();        
+    }
+    
+    void doPendingWrites()
+    {
+        try
+        {                     
+            while(!pendingWrites_.isEmpty()) 
+            {
+                ByteBuffer buffer = pendingWrites_.get(0);
+                socketChannel_.write(buffer);                    
+                if (buffer.remaining() > 0) 
+                {   
+                    break;
+                }               
+                pendingWrites_.remove(0);                    
+            } 
+            
+        }
+        catch(IOException ex)
+        {
+            logger_.warn(LogUtil.throwableToString(ex));
+            // This is to fix the wierd Linux bug with NIO.
+            errorClose();
+        }
+        finally
+        {    
+            synchronized(this)
+            {
+                if (!pendingWrites_.isEmpty() && (key_.interestOps() & SelectionKey.OP_WRITE) == 0)
+                {                    
+                    SelectorManager.getSelectorManager().modifyKeyForWrite(key_); 
+                }  
+            }
+        }
+    }
+    
+    // called in the selector thread
+    public void read(SelectionKey key)
+    {          
+        key.interestOps( key.interestOps() & ( ~SelectionKey.OP_READ ) );         
+        // publish this event onto to the TCPReadEvent Queue.
+        MessagingService.getReadExecutor().execute(readWork_);
+    }
+    
+    public void modifyKeyForRead(SelectionKey key)
+    {
+        key.interestOps( key_.interestOps() | SelectionKey.OP_READ );
+    }
+    
+    public void modifyKeyForWrite(SelectionKey key)
+    {        
+        key.interestOps( key_.interestOps() | SelectionKey.OP_WRITE );
+    }
+    
+    class ReadWorkItem implements Runnable
+    {                 
+        // called from the TCP READ thread pool
+        public void run()
+        {                         
+            if ( tcpReader_ == null )
+            {
+                tcpReader_ = new TcpReader(TcpConnection.this);    
+                StartState nextState = tcpReader_.getSocketState(TcpReader.TcpReaderState.PREAMBLE);
+                if ( nextState == null )
+                {
+                    nextState = new ProtocolState(tcpReader_);
+                    tcpReader_.putSocketState(TcpReader.TcpReaderState.PREAMBLE, nextState);
+                }
+                tcpReader_.morphState(nextState);
+            }
+            
+            try
+            {           
+                byte[] bytes = new byte[0];
+                while ( (bytes = tcpReader_.read()).length > 0 )
+                {                       
+                    ProtocolHeader pH = tcpReader_.getProtocolHeader();                    
+                    if ( !pH.isStreamingMode_ )
+                    {
+                        /* first message received */
+                        if (remoteEp_ == null)
+                        {             
+                            int port = ( pH.isListening_ ) ? DatabaseDescriptor.getStoragePort() : EndPoint.randomPort_;
+                            remoteEp_ = new EndPoint( socketChannel_.socket().getInetAddress().getHostName(), port );                            
+                            // put connection into pool if possible
+                            pool_ = MessagingService.getConnectionPool(localEp_, remoteEp_);                            
+                            pool_.addToPool(TcpConnection.this);                            
+                        }
+                        
+                        /* Deserialize and handle the message */
+                        MessagingService.getDeserilizationExecutor().submit( new MessageDeserializationTask(pH.serializerType_, bytes) );                                                  
+                        tcpReader_.resetState();
+                    }
+                    else
+                    {
+                        MessagingService.setStreamingMode(false);
+                        /* Close this socket connection  used for streaming */
+                        closeSocket();
+                    }                    
+                }
+            }
+            catch ( IOException ex )
+            {                   
+                handleException(ex);
+            }
+            catch ( Throwable th )
+            {
+                handleException(th);
+            }
+            finally
+            {                                     
+                SelectorManager.getSelectorManager().modifyKeyForRead(key_);                
+            }
+        }
+        
+        private void handleException(Throwable th)
+        {
+            logger_.warn("Problem reading from socket connected to : " + socketChannel_);
+            logger_.warn(LogUtil.throwableToString(th));
+            // This is to fix the wierd Linux bug with NIO.
+            errorClose();
+        }
+    }
+    
+    public int pending()
+    {
+        return pendingWrites_.size();
+    }
+    
+    public int compareTo(Object o)
+    {
+        if (o instanceof TcpConnection) 
+        {
+            return pendingWrites_.size() - ((TcpConnection) o).pendingWrites_.size();            
+        }
+                    
+        throw new IllegalArgumentException();
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnectionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnectionHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnectionHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnectionHandler.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.nio.channels.*;
+import java.io.IOException;
+import java.net.*;
+
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class TcpConnectionHandler extends SelectionKeyHandler
+{
+    private static Logger logger_ = Logger.getLogger(TcpConnectionHandler.class);
+    EndPoint localEp_;    
+    
+    public TcpConnectionHandler(EndPoint localEp) 
+    {
+        localEp_ = localEp;
+    }
+
+    public void accept(SelectionKey key)
+    {
+        try 
+        {            
+            ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
+            SocketChannel client = serverChannel.accept();
+            
+            if ( client != null )
+            {                        
+                //new TcpConnection(client, localEp_, true);
+                TcpConnection.acceptConnection(client, localEp_, true);                
+            }            
+        } 
+        catch(IOException e) 
+        {
+            logger_.warn(LogUtil.throwableToString(e));
+        }
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnectionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnectionManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnectionManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/TcpConnectionManager.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+import org.apache.log4j.Logger;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class TcpConnectionManager
+{
+    private Lock lock_ = new ReentrantLock();
+    private List<TcpConnection> allConnections_;
+    private EndPoint localEp_;
+    private EndPoint remoteEp_;
+    private int initialSize_;
+    private int growthFactor_;
+    private int maxSize_;
+    private long lastTimeUsed_;
+    private boolean isShut_;
+    
+    private int inUse_;
+
+    TcpConnectionManager(int initialSize, int growthFactor, int maxSize, EndPoint localEp, EndPoint remoteEp)
+    {
+        initialSize_ = initialSize;
+        growthFactor_ = growthFactor;
+        maxSize_ = maxSize;
+        localEp_ = localEp;
+        remoteEp_ = remoteEp;     
+        isShut_ = false;                
+        lastTimeUsed_ = System.currentTimeMillis();        
+        allConnections_ = new Vector<TcpConnection>(); 
+    }
+    
+    TcpConnection getConnection() throws IOException
+    {
+        lock_.lock();
+        try
+        {
+            if (allConnections_.isEmpty()) 
+            {                
+                TcpConnection conn = new TcpConnection(this, localEp_, remoteEp_);
+                addToPool(conn);
+                conn.inUse_ = true;
+                incUsed();
+                return conn;
+            }
+            
+            TcpConnection least = getLeastLoaded();
+            
+            if ( (least != null && least.pending() == 0) || allConnections_.size() == maxSize_) {
+                least.inUse_ = true;
+                incUsed();
+                return least;
+            }
+                                    
+            TcpConnection connection = new TcpConnection(this, localEp_, remoteEp_);
+            if ( connection != null && !contains(connection) )
+            {
+                addToPool(connection);
+                connection.inUse_ = true;
+                incUsed();
+                return connection;
+            }
+            else
+            {
+                if ( connection != null )
+                {                
+                    connection.closeSocket();
+                }
+                return getLeastLoaded();
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+    }
+    
+    protected TcpConnection getLeastLoaded() 
+    {  
+        TcpConnection connection = null;
+        lock_.lock();
+        try
+        {
+            Collections.sort(allConnections_);
+            connection = (allConnections_.size() > 0 ) ? allConnections_.get(0) : null;
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+        return connection;
+    }
+    
+    void removeConnection(TcpConnection connection)
+    {
+        allConnections_.remove(connection);        
+    }
+    
+    void incUsed()
+    {
+        inUse_++;
+    }
+    
+    void decUsed()
+    {        
+        inUse_--;
+    }
+    
+    int getConnectionsInUse()
+    {
+        return inUse_;
+    }
+
+    void addToPool(TcpConnection connection)
+    { 
+        
+        if ( contains(connection) )
+            return;
+        
+        lock_.lock();
+        try
+        {
+            if ( allConnections_.size() < maxSize_ )
+            {                 
+                allConnections_.add(connection);                
+            }
+            else
+            {                
+                connection.closeSocket();
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+    }
+    
+    void shutdown()
+    {    
+        lock_.lock();
+        try
+        {
+            while ( allConnections_.size() > 0 )
+            {
+                TcpConnection connection = allConnections_.remove(0);                        
+                connection.closeSocket();
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+        isShut_ = true;
+    }
+
+    int getPoolSize()
+    {
+        return allConnections_.size();
+    }
+
+    EndPoint getLocalEndPoint()
+    {
+        return localEp_;
+    }
+    
+    EndPoint getRemoteEndPoint()
+    {
+        return remoteEp_;
+    }
+    
+    int getPendingWrites()
+    {
+        int total = 0;
+        lock_.lock();
+        try
+        {
+            for ( TcpConnection connection : allConnections_ )
+            {
+                total += connection.pending();
+            }
+        }
+        finally
+        {
+            lock_.unlock();
+        }
+        return total;
+    }
+    
+    boolean contains(TcpConnection connection)
+    {
+        return allConnections_.contains(connection);
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/UdpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/UdpConnection.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/UdpConnection.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/UdpConnection.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.net.SocketAddress;
+import java.nio.*;
+import java.nio.channels.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.net.io.ProtocolState;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class UdpConnection extends SelectionKeyHandler
+{
+    private static Logger logger_ = Logger.getLogger(UdpConnection.class);
+    private static final int BUFFER_SIZE = 4096;
+    private static final int protocol_ = 0xBADBEEF;
+    
+    private DatagramChannel socketChannel_;
+    private SelectionKey key_;    
+    private EndPoint localEndPoint_;
+    
+    public void init() throws IOException
+    {
+        socketChannel_ = DatagramChannel.open();
+        socketChannel_.socket().setReuseAddress(true);
+        socketChannel_.configureBlocking(false);        
+    }
+    
+    public void init(int port) throws IOException
+    {
+        // TODO: get TCP port from config and add one.
+        localEndPoint_ = new EndPoint(port);
+        socketChannel_ = DatagramChannel.open();
+        socketChannel_.socket().bind(localEndPoint_.getInetAddress());
+        socketChannel_.socket().setReuseAddress(true);
+        socketChannel_.configureBlocking(false);        
+        key_ = SelectorManager.getUdpSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+    }
+    
+    public boolean write(Message message, EndPoint to) throws IOException
+    {
+        boolean bVal = true;                       
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        Message.serializer().serialize(message, dos);        
+        byte[] data = bos.toByteArray();
+        if ( data.length > 0 )
+        {  
+            logger_.debug("Size of Gossip packet " + data.length);
+            byte[] protocol = BasicUtilities.intToByteArray(protocol_);
+            ByteBuffer buffer = ByteBuffer.allocate(data.length + protocol.length);
+            buffer.put( protocol );
+            buffer.put(data);
+            buffer.flip();
+            
+            int n  = socketChannel_.send(buffer, to.getInetAddress());
+            if ( n == 0 )
+            {
+                bVal = false;
+            }
+        }
+        return bVal;
+    }
+    
+    void close()
+    {
+        try
+        {
+            if ( socketChannel_ != null )
+                socketChannel_.close();
+        }
+        catch ( IOException ex )
+        {
+            logger_.error( LogUtil.throwableToString(ex) );
+        }
+    }
+    
+    public DatagramChannel getDatagramChannel()
+    {
+        return socketChannel_;
+    }
+    
+    private byte[] gobbleHeaderAndExtractBody(ByteBuffer buffer)
+    {
+        byte[] body = new byte[0];        
+        byte[] protocol = new byte[4];
+        buffer = buffer.get(protocol, 0, protocol.length);
+        int value = BasicUtilities.byteArrayToInt(protocol);
+        
+        if ( protocol_ != value )
+        {
+            logger_.info("Invalid protocol header in the incoming message " + value);
+            return body;
+        }
+        body = new byte[buffer.remaining()];
+        buffer.get(body, 0, body.length);       
+        return body;
+    }
+    
+    public void read(SelectionKey key)
+    {        
+        key.interestOps( key.interestOps() & (~SelectionKey.OP_READ) );
+        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+        try
+        {
+            SocketAddress sa = socketChannel_.receive(buffer);
+            if ( sa == null )
+            {
+                logger_.debug("*** No datagram packet was available to be read ***");
+                return;
+            }            
+            buffer.flip();
+            
+            byte[] bytes = gobbleHeaderAndExtractBody(buffer);
+            if ( bytes.length > 0 )
+            {
+                DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+                Message message = Message.serializer().deserialize(dis);                
+                if ( message != null )
+                {                                        
+                    MessagingService.receive(message);
+                }
+            }
+        }
+        catch ( IOException ioe )
+        {
+            logger_.warn(LogUtil.throwableToString(ioe));
+        }
+        finally
+        {
+            key.interestOps( key_.interestOps() | SelectionKey.OP_READ );
+        }
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/ColumnFamilyFormatter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/ColumnFamilyFormatter.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/ColumnFamilyFormatter.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/ColumnFamilyFormatter.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,94 @@
+package org.apache.cassandra.net.http;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.SuperColumn;
+
+
+public class ColumnFamilyFormatter extends HTMLFormatter
+{
+	public ColumnFamilyFormatter()
+	{
+		super();
+	}
+
+	public ColumnFamilyFormatter(StringBuilder sb)
+	{
+		super(sb);
+	}
+
+    public void printKeyColumnFamily(StringBuilder sb, String sKey, ColumnFamily cf)
+    {
+    	// print the key
+		sb.append("Key = " + sKey + "<br>");
+
+		// print the column familie
+		printColumnFamily(sb, cf);
+    }
+
+    public void printColumnFamily(StringBuilder sb, ColumnFamily cf)
+    {
+    	// first print the column family specific data
+    	sb.append("ColumnFamily = " + cf.name() + "<br>");
+
+    	String columnFamilyType = DatabaseDescriptor.getColumnType(cf.name());
+    	Collection<IColumn> cols = cf.getAllColumns();
+    	if("Super".equals(columnFamilyType))
+    	{
+    		printSuperColumns(sb, cols);
+    	}
+    	else
+    	{
+    		printSimpleColumns(sb, cols);
+    	}
+    }
+
+    public void printSuperColumns(StringBuilder sb, Collection<IColumn> cols)
+    {
+		// print the super column summary
+		sb.append("Number of super columns = " + cols.size() + "<br>");
+
+		startTable();
+		for(IColumn col : cols)
+		{
+	        addHeader(col.name());
+			startRow();
+			Collection<IColumn> simpleCols = ((SuperColumn)col).getSubColumns();
+			printSimpleColumns(sb, simpleCols);
+			endRow();
+		}
+		endTable();
+    }
+
+    public void printSimpleColumns(StringBuilder sb, Collection<IColumn> cols)
+    {
+		int numColumns = cols.size();
+		String[] columnNames = new String[numColumns];
+		String[] columnValues = new String[numColumns];
+
+		// print the simple column summary
+		//sb.append("Number of simple columns = " + cols.size() + "<br>");
+
+		int i = 0;
+		for(IColumn col : cols)
+    	{
+			columnNames[i] = col.name();
+			columnValues[i] = new String(col.value());
+			++i;
+    	}
+
+		startTable();
+        addHeaders(columnNames);
+		startRow();
+		for(i = 0; i <  numColumns; ++i)
+		{
+			addCol(columnValues[i]);
+		}
+		endRow();
+		endTable();
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HTMLFormatter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HTMLFormatter.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HTMLFormatter.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HTMLFormatter.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Helper function to write some basic HTML.
+ */
+
+package org.apache.cassandra.net.http;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HTMLFormatter
+{
+    protected StringBuilder sb_ = null;
+    private boolean writeBody_;
+
+    public HTMLFormatter()
+    {
+        sb_ = new StringBuilder();
+    }
+
+    public HTMLFormatter(StringBuilder sb)
+    {
+        sb_ = sb;
+    }
+
+    public void startBody()
+    {
+    	startBody(false, "", true, true);
+    }
+
+    public void startBody(boolean writeJSCallback, String jsCallbackFunction, boolean writeCSS, boolean writeBody)
+    {
+    	writeBody_ = writeBody;
+
+        sb_.append("<html>\n");
+        if(writeCSS || writeJSCallback)
+        {
+	        sb_.append("<head>\n");
+	        if(writeJSCallback)
+	        	addJSCallback(jsCallbackFunction);
+	        if(writeCSS)
+	        	addCSS();
+	        sb_.append("</head>\n");
+        }
+
+        if(writeBody)
+        {
+        	sb_.append("<body bgcolor=black>\n");
+        }
+    }
+
+    public void endBody()
+    {
+    	if(writeBody_)
+    	{
+    		sb_.append("</body>\n");
+    	}
+        sb_.append("</html>\n");
+    }
+
+    public void appendLine(String s)
+    {
+        sb_.append(s);
+        sb_.append("<br>\n");
+    }
+
+    public void append(String s)
+    {
+        sb_.append(s);
+    }
+
+    public void addJScript(String jscript)
+    {
+    	append("<script language=\"text/javascript\">\n");
+    	append(jscript + "\n");
+    	append("</script>\n");
+    }
+
+    public void startTable()
+    {
+        sb_.append("<table>\n");
+    }
+
+    public void addHeaders(String[] sTableHeaders)
+    {
+        sb_.append("<tr style=\"border: 2px solid #333333\"	>\n");
+        for (int i = 0; i < sTableHeaders.length; ++i)
+        {
+            sb_.append("<th><div class=\"tmenubar\">");
+            sb_.append("<b>" + sTableHeaders[i] + "</b>");
+            sb_.append("</div></th>\n");
+        }
+        sb_.append("\n</tr>\n\n");
+    }
+
+    public void addHeader(String sTableHeader)
+    {
+        sb_.append("<tr style=\"border: 2px solid #333333\"	>\n");
+        sb_.append("<th><div class=\"tmenubar\">");
+        sb_.append("<b>" + sTableHeader + "</b>");
+        sb_.append("</div></th>\n");
+        sb_.append("\n</tr>\n\n");
+    }
+
+    public void startRow()
+    {
+        sb_.append("<tr style=\"border: 2px solid #333333\">\n");
+    }
+
+    public void addCol(String sData)
+    {
+        sb_.append("<td style=\"border: 2px solid #333333\">");
+        sb_.append(sData);
+        sb_.append("</td>");
+    }
+
+    public void endRow()
+    {
+        sb_.append("</tr>\n");
+    }
+
+    public void endTable()
+    {
+        sb_.append("</table>\n");
+    }
+
+    public void addCombobox(Set<String> comboBoxEntries, String htmlElementName)
+    {
+    	addCombobox(comboBoxEntries, htmlElementName, -1);
+    }
+
+    public void addCombobox(Set<String> comboBoxEntries, String htmlElementName, int defaultSelected)
+    {
+    	sb_.append("  <select name=" + htmlElementName + " size=1>\n");
+    	if(defaultSelected == -1)
+    	{
+    		sb_.append("    <option value=\"\" SELECTED>Select an option \n");
+    	}
+
+    	int i = 0;
+    	for(String colFamName : comboBoxEntries)
+    	{
+    		if(defaultSelected == i)
+    		{
+    			sb_.append("    <option value=\"" + colFamName + "\" SELECTED>" + colFamName + "\n");
+    		}
+    		else
+    		{
+    			sb_.append("    <option value=\"" + colFamName + "\">" + colFamName + "\n");
+    		}
+    	}
+    	sb_.append("  </select>\n");
+    }
+
+    public void addDivElement(String divId, String value)
+    {
+    	sb_.append("<div id = \"" + divId + "\">");
+    	if(value != null)
+    		sb_.append(value);
+    	sb_.append("</div>\n");
+    }
+
+    public void createTable(String[] sTableHeaders, String[][] sTable)
+    {
+        if (sTable == null || sTable.length == 0)
+            return;
+
+        sb_.append("<table style=\"border: 2px solid #333333\">\n");
+
+        sb_.append("<tr style=\"border: 2px solid #333333\">\n");
+        for (int i = 0; i < sTableHeaders.length; ++i)
+        {
+            sb_.append("<td style=\"border: 2px solid #333333\">");
+            sb_.append("<b>" + sTableHeaders[i] + "</b>");
+            sb_.append("</td>\n");
+        }
+        sb_.append("\n</tr>\n\n");
+
+        for (int i = 0; i < sTable.length; ++i)
+        {
+            sb_.append("<tr style=\"border: 2px solid #333333\">\n");
+            for (int j = 0; j < sTable[i].length; ++j)
+            {
+                sb_.append("<td style=\"border: 2px solid #333333\">");
+                sb_.append(sTable[i][j]);
+                sb_.append("</td>\n");
+            }
+            sb_.append("\n</tr>\n\n");
+        }
+        sb_.append("</table>\n");
+    }
+
+    public void addJSCallback(String jsCallbackFunction)
+    {
+    	sb_.append("<script type=\"text/javascript\">\n");
+
+    	addJSForTabs();
+
+    	sb_.append(jsCallbackFunction +"\n");
+    	sb_.append("</script>\n");
+    }
+
+    public void addCSS()
+    {
+        sb_.append("<style type=\"text/css\">\n");
+        sb_.append("body\n");
+        sb_.append("{\n");
+        sb_.append("  color:white;\n");
+        sb_.append("  font-family:Arial Unicode MS,Verdana, Arial, Sans-serif;\n");
+        sb_.append("  font-size:10pt;\n");
+        sb_.append("}\n");
+
+        sb_.append(".tmenubar\n");
+        sb_.append("{\n");
+        sb_.append("  background-color:green;\n");
+        sb_.append("  font-family:Verdana, Arial, Sans-serif;\n");
+        sb_.append("  font-size:10pt;\n");
+        sb_.append("  font-weight:bold;\n");
+        sb_.append("}\n");
+
+        sb_.append("th\n");
+        sb_.append("{\n");
+        sb_.append(" 	 color:white;\n");
+        sb_.append("}\n");
+
+        sb_.append("td\n");
+        sb_.append("{\n");
+        sb_.append(" 	 color:white;\n");
+        sb_.append("}\n");
+        sb_.append("a:link {color:#CAF99B;font-size:10pt;font-weight:bold;font-family:Arial Unicode MS,Lucida-grande,Verdana}\n");
+        sb_.append("a:visited {color:red}\n");
+        sb_.append("a:hover{color:yellow;font-size:10pt;font-weight:bold;font-family:Arial Unicode MS,Lucida-grande,Verdana;background-color:green}\n");
+
+        addCSSForTabs();
+
+        sb_.append("</style>\n");
+
+    }
+
+    public void addCSSForTabs()
+    {
+    	sb_.append("#header ul {\n");
+    	sb_.append("	list-style: none;\n");
+    	sb_.append("	padding: 0;\n");
+    	sb_.append("	margin: 0;\n");
+    	sb_.append("	}\n");
+    	sb_.append("\n");
+    	sb_.append("#header li {\n");
+    	sb_.append("	float: left;\n");
+    	sb_.append("	border: 1px solid #bbb;\n");
+    	sb_.append("	border-bottom-width: 0;\n");
+    	sb_.append("	margin: 0;\n");
+    	sb_.append("}\n");
+    	sb_.append("\n");
+    	sb_.append("#header a {\n");
+    	sb_.append("	text-decoration: none;\n");
+    	sb_.append("	display: block;\n");
+    	sb_.append("	background: #eee;\n");
+    	sb_.append("	padding: 0.24em 1em;\n");
+    	sb_.append("	color: #00c;\n");
+    	sb_.append("	width: 8em;\n");
+    	sb_.append("	text-align: center;\n");
+    	sb_.append("	}\n");
+    	sb_.append("\n");
+    	sb_.append("#header a:hover {\n");
+    	sb_.append("	background: #ddf;\n");
+    	sb_.append("}\n");
+    	sb_.append("\n");
+    	sb_.append("#header #selected {\n");
+    	sb_.append("	border-color: black;\n");
+    	sb_.append("}\n");
+    	sb_.append("\n");
+    	sb_.append("#header #selected a {\n");
+    	sb_.append("	position: relative;\n");
+    	sb_.append("	top: 1px;\n");
+    	sb_.append("	background: white;\n");
+    	sb_.append("	color: black;\n");
+    	sb_.append("	font-weight: bold;\n");
+    	sb_.append("}\n");
+    	sb_.append("\n");
+    	sb_.append("#content {\n");
+    	sb_.append("	border: 1px solid black;\n");
+    	sb_.append("	visibility:hidden;\n");
+    	sb_.append("	position:absolute;\n");
+    	sb_.append("	top:200;\n");
+    	sb_.append("	clear: both;\n");
+    	sb_.append("	padding: 0 1em;\n");
+    	sb_.append("}\n");
+    	sb_.append("\n");
+    	sb_.append("h1 {\n");
+    	sb_.append("	margin: 0;\n");
+    	sb_.append("	padding: 0 0 1em 0;\n");
+    	sb_.append("}\n");
+    }
+
+    public void addJSForTabs()
+    {
+    	sb_.append("var curSelectedDivId = \"one\";\n");
+    	sb_.append("\n");
+    	sb_.append("function selectTab(tabDivId)\n");
+    	sb_.append("{\n");
+    	sb_.append("	var x = document.getElementsByName(curSelectedDivId);\n");
+    	sb_.append("	if(x[1])\n");
+    	sb_.append("		x[1].style.visibility=\"hidden\";\n");
+    	sb_.append("	if(x[0])\n");
+    	sb_.append("		x[0].id=curSelectedDivId;\n");
+    	sb_.append("\n");
+    	sb_.append("\n");
+    	sb_.append("	var y = document.getElementsByName(tabDivId);\n");
+    	sb_.append("	if(y[1])\n");
+    	sb_.append("		y[1].style.visibility=\"visible\";\n");
+    	sb_.append("	if(y[0])\n");
+    	sb_.append("		y[0].id = \"selected\";\n");
+    	sb_.append("\n");
+    	sb_.append("	curSelectedDivId = tabDivId;\n");
+    	sb_.append("}\n");
+    }
+
+    public String toString()
+    {
+        return sb_.toString();
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnection.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This class accepts a client connection and parses http data from it.
+ */
+
+// TODO: shouldClose_ is not used correctly. It should be used to close the socket? When?
+
+package org.apache.cassandra.net.http;
+
+import java.util.*;
+import java.net.*;
+import java.io.*;
+import java.nio.*;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.concurrent.SingleThreadedStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.SelectionKeyHandler;
+import org.apache.cassandra.net.SelectorManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ *
+ * @author kranganathan
+ */
+public class HttpConnection extends SelectionKeyHandler implements HttpStartLineParser.Callback, HttpHeaderParser.Callback
+{
+    private static Logger logger_ = Logger.getLogger(StorageService.class);
+    public static final String httpRequestVerbHandler_ = "HTTP-REQUEST-VERB-HANDLER";
+    public static final String httpStage_ = "HTTP-STAGE";
+
+    /*
+     * These are the callbacks into who ever intends
+     * to listen on the client socket.
+     */
+    public interface HttpConnectionListener
+    {
+        public void onRequest(HttpRequest httpRequest);
+        public void onResponse(HttpResponse httpResponse);
+    }
+
+    enum HttpMessageType
+    {
+        UNKNOWN,
+        REQUEST,
+        RESPONSE
+    }
+
+    enum ParseState
+    {
+        IN_NEW,
+        IN_START,
+        IN_HEADERS, IN_BODY
+    }
+
+    private ParseState parseState_ = ParseState.IN_NEW;
+    private long parseStartTime_ = 0;
+    private HttpMessageType currentMsgType_ = HttpMessageType.UNKNOWN;
+    private int contentLength_ = 0;
+    private List<ByteBuffer> bodyBuffers_ = new LinkedList<ByteBuffer>();
+    private boolean shouldClose_ = false;
+    private String defaultContentType_ = "text/html";
+    private HttpRequest currentRequest_ = null;
+    private HttpResponse currentResponse_ = null;
+    private HttpStartLineParser startLineParser_ = new HttpStartLineParser(this);
+    private HttpHeaderParser headerParser_ = new HttpHeaderParser(this);
+    /* Selection Key associated with this HTTP Connection */
+    private SelectionKey httpKey_;
+    /* SocketChannel associated with this HTTP Connection */
+    private SocketChannel httpChannel_;
+    /* HTTPReader instance associated with this HTTP Connection */
+    private HTTPReader httpReader_ = new HTTPReader();
+
+    /*
+     * This abstraction starts reading the data that comes in
+     * on a HTTP request. It accumulates the bytes read into
+     * a buffer and passes the buffer to the HTTP parser.
+    */
+
+    class HTTPReader implements Runnable
+    {
+        /* We read 256 bytes at a time from a HTTP connection */
+        private static final int bufferSize_ = 256;
+
+        /*
+         * Read buffers from the input stream into the byte buffer.
+         */
+        public void run()
+        {
+            ByteBuffer readBuffer = ByteBuffer.allocate(HTTPReader.bufferSize_);
+            try
+            {
+                int bytesRead = httpChannel_.read(readBuffer);
+                readBuffer.flip();
+                if ( readBuffer.remaining() > 0 )
+                    HttpConnection.this.parse(readBuffer);
+            }
+            catch ( IOException ex )
+            {
+                logger_.warn(LogUtil.throwableToString(ex));
+            }
+        }
+    }
+
+    public static class HttpRequestMessage
+    {
+        private HttpRequest httpRequest_;
+        private HttpConnection httpConnection_;
+
+        HttpRequestMessage(HttpRequest httpRequest, HttpConnection httpConnection)
+        {
+            httpRequest_ = httpRequest;
+            httpConnection_ = httpConnection;
+        }
+
+        public HttpRequest getHttpRequest()
+        {
+            return httpRequest_;
+        }
+
+        public HttpConnection getHttpConnection()
+        {
+            return httpConnection_;
+        }
+    }
+
+    /*
+     *  Read called on the Selector thread. This is called
+     *  when there is some HTTP request that needs to be
+     *  processed.
+    */
+    public void read(SelectionKey key)
+    {
+        if ( httpKey_ == null )
+        {
+            httpKey_ = key;
+            httpChannel_ = (SocketChannel)key.channel();
+        }
+        /* deregister interest for read */
+        key.interestOps( key.interestOps() & ( ~SelectionKey.OP_READ ) );
+        /* Add a task to process the HTTP request */
+        MessagingService.getReadExecutor().execute(httpReader_);
+    }
+
+    public void modifyKeyForRead(SelectionKey key)
+    {
+        key.interestOps( httpKey_.interestOps() | SelectionKey.OP_READ );
+    }
+
+    private void resetParserState()
+    {
+        startLineParser_.resetParserState();
+        headerParser_.resetParserState();
+        parseState_ = ParseState.IN_NEW;
+        contentLength_ = 0;
+        bodyBuffers_ = new LinkedList<ByteBuffer>();
+        currentMsgType_ = HttpMessageType.UNKNOWN;
+        currentRequest_ = null;
+        currentResponse_ = null;
+    }
+
+    public void close()
+    {        
+        logger_.info("Closing HTTP socket ...");
+        if ( httpKey_ != null )
+            SelectorManager.getSelectorManager().cancel(httpKey_);
+    }
+
+    /*
+     * Process the HTTP commands sent from the client. Reads
+     * the socket and parses the HTTP request.
+    */
+    public void parse(ByteBuffer bb)
+    {
+        try
+        {
+            logger_.debug("Processing http requests from socket ...");
+            switch (parseState_)
+            {
+                case IN_NEW:
+                    parseState_ = ParseState.IN_START;
+                    parseStartTime_ = System.currentTimeMillis();
+
+                // fall through
+                case IN_START:
+                    if (startLineParser_.onMoreBytesNew(bb) == false)
+                    {
+                        break;
+                    }
+                    else
+                    {
+                        /* Already done through the callback */
+                        parseState_ = ParseState.IN_HEADERS;
+                    }
+
+                // fall through
+                case IN_HEADERS:
+                    if (headerParser_.onMoreBytesNew(bb) == false)
+                    {
+
+                        break; // need more bytes
+                    }
+                    else
+                    {
+                        String len;
+                        if (currentMsgType_ == HttpMessageType.REQUEST)
+                        {
+                            len = currentRequest_.getHeader(HttpProtocolConstants.CONTENT_LENGTH);
+
+                            // find if we should close method
+                            if (currentRequest_.getVersion().equalsIgnoreCase("HTTP/1.1"))
+                            {
+                                /*
+                                 * Scan all of the headers for close messages
+                                 */
+                                String val = currentRequest_.getHeader(HttpProtocolConstants.CONNECTION);
+
+                                if (val != null && val.equalsIgnoreCase(HttpProtocolConstants.CLOSE))
+                                {
+                                    shouldClose_ = true;
+                                }
+                            } else if (currentRequest_.getVersion().equalsIgnoreCase("HTTP/1.0"))
+                            {
+                                /* By default no keep-alive */
+                                shouldClose_ = true;
+
+                                /*
+                                 * Scan all of the headers for keep-alive
+                                 * messages
+                                 */
+                                String val = currentRequest_.getHeader(HttpProtocolConstants.CONNECTION);
+
+                                if (val != null && val.equalsIgnoreCase(HttpProtocolConstants.KEEP_ALIVE))
+                                {
+                                    shouldClose_ = false;
+                                }
+                            } else
+                            {
+                                /* Assume 0.9 */
+                                shouldClose_ = true;
+                            }
+                        }
+                        else if (currentMsgType_ == HttpMessageType.RESPONSE)
+                        {
+                            len = currentResponse_.getHeader(HttpProtocolConstants.CONTENT_LENGTH);
+
+                        // TODO: pay attention to keep-alive and
+                        // close headers
+                        }
+                        else
+                        {
+                            logger_.warn("in HttpConnection::processInput_() Message type is not set");
+                            return;
+                        }
+
+                        if (len != null)
+                        {
+                            try
+                            {
+                                if(len == null || len.equals(""))
+                                    contentLength_ = 0;
+                                else
+                                    contentLength_ = Integer.parseInt(len);
+                            }
+                            catch (NumberFormatException ex)
+                            {
+                                throw new HttpParsingException();
+                            }
+                        }
+                        parseState_ = ParseState.IN_BODY;
+                    }
+
+                // fall through
+                case IN_BODY:
+                    boolean done = false;
+
+                    if (contentLength_ > 0)
+                    {
+                        if (bb.remaining() > contentLength_)
+                        {
+                            int newLimit = bb.position() + contentLength_;
+                            bodyBuffers_.add(((ByteBuffer) bb.duplicate().limit(newLimit)).slice());
+                            bb.position(newLimit);
+                            contentLength_ = 0;
+                        }
+                        else
+                        {
+                            contentLength_ -= bb.remaining();
+                            bodyBuffers_.add(bb.duplicate());
+                            bb.position(bb.limit());
+                        }
+                    }
+
+                if (contentLength_ == 0)
+                {
+                    done = true;
+                }
+
+                if (done)
+                {
+                    if (currentMsgType_ == HttpMessageType.REQUEST)
+                    {
+                        //currentRequest_.setParseTime(env_.getCurrentTime() - parseStartTime_);
+                        currentRequest_.setBody(bodyBuffers_);
+
+                        if (currentRequest_.getHeader("Content-Type") == null)
+                        {
+                            currentRequest_.addHeader("Content-Type", defaultContentType_);
+                        }
+
+                        handleRequest(currentRequest_);
+                    }
+                    else if (currentMsgType_ == HttpMessageType.RESPONSE)
+                    {
+                        logger_.info("Holy shit! We are not supposed to be here - ever !!!");
+                    }
+                    else
+                    {
+                        logger_.error("Http message type is still" +
+                                " unset after we finish parsing the body?");
+                    }
+
+                    resetParserState();
+                }
+            }
+
+        }
+        catch (final Throwable e)
+        {
+            logger_.warn(LogUtil.throwableToString(e));
+            //close();
+        }
+        finally
+        {
+            SelectorManager.getSelectorManager().modifyKeyForRead(httpKey_);
+        }
+    }
+
+    public void write(ByteBuffer buffer)
+    {
+        /*
+         * TODO: Make this a non blocking write.
+        */
+        try
+        {
+            while ( buffer.remaining() > 0 )
+            {
+                httpChannel_.write(buffer);
+            }
+            close();
+        }
+        catch ( IOException ex )
+        {
+            logger_.warn(LogUtil.throwableToString(ex));
+        }
+    }
+
+    private void handleRequest(HttpRequest request)
+    {
+        HttpConnection.HttpRequestMessage httpRequestMessage = new HttpConnection.HttpRequestMessage(request, this);
+        Message httpMessage = new Message(null, HttpConnection.httpStage_, HttpConnection.httpRequestVerbHandler_, new Object[]{httpRequestMessage});
+        MessagingService.receive(httpMessage);
+    }
+
+    // HttpStartLineParser.Callback interface implementation
+    public void onStartLine(String method, String path, String query, String version)
+    {
+        logger_.debug("Startline method=" + method + " path=" + path + " query=" + query + " version=" + version);
+
+        if (method.startsWith("HTTP"))
+        {
+                // response
+                currentMsgType_ = HttpMessageType.RESPONSE;
+                currentResponse_ = new HttpResponse();
+                currentResponse_.setStartLine(method, path, version);
+        }
+        else
+        {
+                // request
+                currentMsgType_ = HttpMessageType.REQUEST;
+                currentRequest_ = new HttpRequest();
+                currentRequest_.setStartLine(method, path, query, version);
+        }
+    }
+
+    // HttpHeaderParser.Callback interface implementation
+    public void onHeader(String name, String value)
+    {
+        if (currentMsgType_ == HttpMessageType.REQUEST)
+        {
+                currentRequest_.addHeader(name, value);
+        }
+        else if (currentMsgType_ == HttpMessageType.RESPONSE)
+        {
+                currentResponse_.addHeader(name, value);
+        }
+        else
+        {
+            logger_.warn("Unknown message type -- HttpConnection::onHeader()");
+        }
+
+        logger_.debug(name + " : " + value);
+    }
+}
+
+
+

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnectionHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnectionHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnectionHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpConnectionHandler.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.SelectionKeyHandler;
+import org.apache.cassandra.net.SelectorManager;
+import org.apache.log4j.Logger;
+
+public class HttpConnectionHandler extends SelectionKeyHandler
+{
+    private static Logger logger_ = Logger.getLogger(HttpConnectionHandler.class);
+    
+    public void accept(SelectionKey key)
+    {
+        try
+        {
+            ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
+            SocketChannel client = serverChannel.accept();
+            if ( client != null )
+            {
+                client.configureBlocking(false);            
+                SelectionKeyHandler handler = new HttpConnection();
+                SelectorManager.getSelectorManager().register(client, handler, SelectionKey.OP_READ);
+            }
+        } 
+        catch(IOException e) 
+        {
+            logger_.warn(e);
+        }
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpHeaderParser.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpHeaderParser.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpHeaderParser.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpHeaderParser.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.cassandra.net.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ *
+ * @author kranganathan
+ */
+/**
+ * A parser for HTTP header lines.  
+ * 
+ */
+public class HttpHeaderParser
+{
+
+    private Callback callback_;
+
+    public interface Callback
+    {
+
+        public void onHeader(String key, String value);
+    }
+
+    public HttpHeaderParser(Callback cb)
+    {
+        callback_ = cb;
+    }
+
+    enum HeaderParseState
+    {
+        // we are at the very beginning of the line
+        START_OF_HEADER_LINE,
+        // are at line beginning, read '\r' but ran out of bytes in this round
+        START_OF_HEADER_LINE_WITH_READ_SLASH_R,
+        // we are in the process of parsing a header key
+        IN_HEADER_KEY,
+        // eat whitespace after the ':' but before the value
+        PRE_HEADER_VALUE_WHITESPACE,
+        // we are in the process of parsing a header value
+        IN_HEADER_VALUE,
+        // were in IN_HEADER_VALUE and read '\r' but ran out of more bytes
+        IN_HEADER_VALUE_WITH_READ_SLASH_R,
+        /*
+         * got \r\n in the header value.  now consider whether its a multilined 
+         * value.  For example,
+         *
+         * HeaderKey: HeaderValue\r\n this is still part of the value\r\n
+         * 
+         * is a valid HTTP header line with value 
+         *
+         * HeaderValue\r\n this is still part of the value
+         *
+         * NOTE: while all whitespace should generally be condensed into a 
+         * single space by the HTTP standard, we will just preserve all of the
+         * whitespace for now
+         * 
+         * TODO: consider replacing all whitespace with a single space
+         * 
+         * TODO: this parser doesn't correctly preserve the \r\n, should it?
+         */
+        CHECKING_END_OF_VALUE,
+        // we are just about to reset the state of the header parser
+        TO_RESET
+    }
+
+    // the current state of the parser
+    private HeaderParseState parseState_ = HeaderParseState.TO_RESET;
+    // incrementally build up this HTTP header key as we read it
+    private StringBuilder headerKey_ = new StringBuilder(32);
+
+    // incrementally build up this HTTP header value as we read it
+    private StringBuilder headerValue_ = new StringBuilder(64);
+
+    public void resetParserState()
+    {
+        headerKey_.setLength(0);
+        headerValue_.setLength(0);
+        parseState_ = HeaderParseState.START_OF_HEADER_LINE;
+    }
+
+    private void finishCurrentHeader_()
+    {
+        if (callback_ != null)
+        {
+            callback_.onHeader(headerKey_.toString().trim(), headerValue_
+                    .toString().trim());
+        }
+        resetParserState();
+    }
+
+    public boolean onMoreBytes(InputStream in) throws IOException
+    {
+        int got;
+
+        if (parseState_ == HeaderParseState.TO_RESET)
+        {
+            resetParserState();
+        }
+
+        while (in.available() > 0)
+        {
+            in.mark(1);
+            got = in.read();
+
+            switch (parseState_)
+            {
+
+            case START_OF_HEADER_LINE:
+                switch (got)
+                {
+                case '\r':
+                    if (in.available() > 0)
+                    {
+                        in.mark(1);
+                        got = in.read();
+
+                        if (got == '\n')
+                        {
+                            parseState_ = HeaderParseState.TO_RESET;
+                            return true;
+                        } // TODO: determine whether this \r-eating is valid
+                        else
+                        {
+                            in.reset();
+                        }
+                    } // wait for more data to make this decision
+                    else
+                    {
+                        in.reset();
+                        return false;
+                    }
+                    break;
+
+                default:
+                    in.reset();
+                    parseState_ = HeaderParseState.IN_HEADER_KEY;
+                    break;
+                }
+                break;
+
+            case IN_HEADER_KEY:
+                switch (got)
+                {
+                case ':':
+                    parseState_ = HeaderParseState.PRE_HEADER_VALUE_WHITESPACE;
+                    break;
+                // TODO: find out: whether to eat whitespace before a : 
+                default:
+                    headerKey_.append((char) got);
+                    break;
+                }
+                break;
+
+            case PRE_HEADER_VALUE_WHITESPACE:
+                switch (got)
+                {
+                case ' ':
+                case '\t':
+                    break;
+                default:
+                    in.reset();
+                    parseState_ = HeaderParseState.IN_HEADER_VALUE;
+                    break;
+                }
+                break;
+
+            case IN_HEADER_VALUE:
+                switch (got)
+                {
+                case '\r':
+                    if (in.available() > 0)
+                    {
+                        in.mark(1);
+                        got = in.read();
+
+                        if (got == '\n')
+                        {
+                            parseState_ = HeaderParseState.CHECKING_END_OF_VALUE;
+                            break;
+                        } // TODO: determine whether this \r-eating is valid
+                        else
+                        {
+                            in.reset();
+                        }
+                    }
+                    else
+                    {
+                        in.reset();
+                        return false;
+                    }
+                    break;
+                default:
+                    headerValue_.append((char) got);
+                    break;
+                }
+                break;
+
+            case CHECKING_END_OF_VALUE:
+                switch (got)
+                {
+                case ' ':
+                case '\t':
+                    in.reset();
+                    parseState_ = HeaderParseState.IN_HEADER_VALUE;
+                    break;
+                default:
+                    in.reset();
+                    finishCurrentHeader_();
+                }
+                break;
+            default:
+                assert false;
+                parseState_ = HeaderParseState.START_OF_HEADER_LINE;
+                break;
+            }
+        }
+
+        return false;
+    }
+
+    public boolean onMoreBytesNew(ByteBuffer buffer) throws IOException
+    {
+
+        int got;
+        int limit = buffer.limit();
+        int pos = buffer.position();
+
+        if (parseState_ == HeaderParseState.TO_RESET)
+        {
+            resetParserState();
+        }
+
+        while (pos < limit)
+        {
+            switch (parseState_)
+            {
+
+            case START_OF_HEADER_LINE:
+                if ((got = buffer.get(pos)) != '\r')
+                {
+                    parseState_ = HeaderParseState.IN_HEADER_KEY;
+                    break;
+                }
+                else
+                {
+                    pos++;
+                    if (pos == limit) // Need more bytes
+                    {
+                        buffer.position(pos);
+                        parseState_ = HeaderParseState.START_OF_HEADER_LINE_WITH_READ_SLASH_R;
+                        return false;
+                    }
+                }
+            // fall through
+
+            case START_OF_HEADER_LINE_WITH_READ_SLASH_R:
+                // Processed "...\r\n\r\n" - headers are complete
+                if (((char) buffer.get(pos)) == '\n')
+                {
+                    buffer.position(++pos);
+                    parseState_ = HeaderParseState.TO_RESET;
+                    return true;
+                } // TODO: determine whether this \r-eating is valid
+                else
+                {
+                    parseState_ = HeaderParseState.IN_HEADER_KEY;
+                }
+            //fall through
+
+            case IN_HEADER_KEY:
+                // TODO: find out: whether to eat whitespace before a :
+                while (pos < limit && (got = buffer.get(pos)) != ':')
+                {
+                    headerKey_.append((char) got);
+                    pos++;
+                }
+                if (pos < limit)
+                {
+                    pos++; //eating ':'
+                    parseState_ = HeaderParseState.PRE_HEADER_VALUE_WHITESPACE;
+                }
+                break;
+
+            case PRE_HEADER_VALUE_WHITESPACE:
+                while ((((got = buffer.get(pos)) == ' ') || (got == '\t'))
+                        && (++pos < limit))
+                {
+                    ;
+                }
+                if (pos < limit)
+                {
+                    parseState_ = HeaderParseState.IN_HEADER_VALUE;
+                }
+                break;
+
+            case IN_HEADER_VALUE:
+                while (pos < limit && (got = buffer.get(pos)) != '\r')
+                {
+                    headerValue_.append((char) got);
+                    pos++;
+                }
+                if (pos == limit)
+                {
+                    break;
+                }
+
+                pos++;
+                if (pos == limit)
+                {
+                    parseState_ = HeaderParseState.IN_HEADER_VALUE_WITH_READ_SLASH_R;
+                    break;
+                    //buffer.position(pos);
+                    //return false;
+                }
+            // fall through
+
+            case IN_HEADER_VALUE_WITH_READ_SLASH_R:
+                if (((char) buffer.get(pos)) == '\n')
+                {
+                    parseState_ = HeaderParseState.CHECKING_END_OF_VALUE;
+                    pos++;
+                } // TODO: determine whether this \r-eating is valid
+                else
+                {
+                    parseState_ = HeaderParseState.IN_HEADER_VALUE;
+                }
+                break;
+
+            case CHECKING_END_OF_VALUE:
+                switch ((char) buffer.get(pos))
+                {
+                case ' ':
+                case '\t':
+                    parseState_ = HeaderParseState.IN_HEADER_VALUE;
+                    break;
+
+                default:
+                    // Processed "headerKey headerValue\r\n"
+                    finishCurrentHeader_();
+                }
+                break;
+
+            default:
+                assert false;
+                parseState_ = HeaderParseState.START_OF_HEADER_LINE;
+                break;
+            }
+
+        }
+        // Need to read more bytes - get next buffer
+        buffer.position(pos);
+        return false;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpParsingException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpParsingException.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpParsingException.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpParsingException.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.cassandra.net.http;
+
+import java.io.IOException;
+
+/**
+ *
+ * @author kranganathan
+ */
+
+public class HttpParsingException extends IOException
+{
+    private static final long serialVersionUID = 1L;
+}
+
+

Added: incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpProtocolConstants.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpProtocolConstants.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpProtocolConstants.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/net/http/HttpProtocolConstants.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+package org.apache.cassandra.net.http;
+
+/**
+ *
+ * @author kranganathan
+ */
+public interface HttpProtocolConstants
+{
+    static final String CONNECTION = "Connection";
+    static final String CONTENT_LENGTH = "Content-Length";
+    static final String CLOSE = "close";
+    static final String KEEP_ALIVE = "Keep-Alive";
+}