You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC

svn commit: r799331 [22/29] - in /incubator/cassandra/trunk: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/UdpConnection.java Thu Jul 30 15:30:21 2009
@@ -1,168 +1,168 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net;
-
-import java.net.SocketAddress;
-import java.nio.*;
-import java.nio.channels.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.cassandra.net.io.ProtocolState;
-import org.apache.cassandra.net.sink.SinkManager;
-import org.apache.cassandra.utils.BasicUtilities;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-import org.apache.cassandra.concurrent.*;
-import org.apache.cassandra.utils.*;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class UdpConnection extends SelectionKeyHandler
-{
-    private static Logger logger_ = Logger.getLogger(UdpConnection.class);
-    private static final int BUFFER_SIZE = 4096;
-    private static final int protocol_ = 0xBADBEEF;
-    
-    private DatagramChannel socketChannel_;
-    private SelectionKey key_;    
-    private EndPoint localEndPoint_;
-    
-    public void init() throws IOException
-    {
-        socketChannel_ = DatagramChannel.open();
-        socketChannel_.socket().setReuseAddress(true);
-        socketChannel_.configureBlocking(false);        
-    }
-    
-    public void init(int port) throws IOException
-    {
-        // TODO: get TCP port from config and add one.
-        localEndPoint_ = new EndPoint(port);
-        socketChannel_ = DatagramChannel.open();
-        socketChannel_.socket().bind(localEndPoint_.getInetAddress());
-        socketChannel_.socket().setReuseAddress(true);
-        socketChannel_.configureBlocking(false);        
-        key_ = SelectorManager.getUdpSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
-    }
-    
-    public boolean write(Message message, EndPoint to) throws IOException
-    {
-        boolean bVal = true;                       
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(bos);
-        Message.serializer().serialize(message, dos);        
-        byte[] data = bos.toByteArray();
-        if ( data.length > 0 )
-        {  
-            if (logger_.isTraceEnabled())
-                logger_.trace("Size of Gossip packet " + data.length);
-            byte[] protocol = BasicUtilities.intToByteArray(protocol_);
-            ByteBuffer buffer = ByteBuffer.allocate(data.length + protocol.length);
-            buffer.put( protocol );
-            buffer.put(data);
-            buffer.flip();
-            
-            int n  = socketChannel_.send(buffer, to.getInetAddress());
-            if ( n == 0 )
-            {
-                bVal = false;
-            }
-        }
-        return bVal;
-    }
-    
-    void close()
-    {
-        try
-        {
-            if ( socketChannel_ != null )
-                socketChannel_.close();
-        }
-        catch ( IOException ex )
-        {
-            logger_.error( LogUtil.throwableToString(ex) );
-        }
-    }
-    
-    public DatagramChannel getDatagramChannel()
-    {
-        return socketChannel_;
-    }
-    
-    private byte[] gobbleHeaderAndExtractBody(ByteBuffer buffer)
-    {
-        byte[] body = new byte[0];        
-        byte[] protocol = new byte[4];
-        buffer = buffer.get(protocol, 0, protocol.length);
-        int value = BasicUtilities.byteArrayToInt(protocol);
-        
-        if ( protocol_ != value )
-        {
-            logger_.info("Invalid protocol header in the incoming message " + value);
-            return body;
-        }
-        body = new byte[buffer.remaining()];
-        buffer.get(body, 0, body.length);       
-        return body;
-    }
-    
-    public void read(SelectionKey key)
-    {        
-        turnOffInterestOps(key, SelectionKey.OP_READ);
-        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
-        try
-        {
-            SocketAddress sa = socketChannel_.receive(buffer);
-            if ( sa == null )
-            {
-                if (logger_.isDebugEnabled())
-                  logger_.debug("*** No datagram packet was available to be read ***");
-                return;
-            }            
-            buffer.flip();
-            
-            byte[] bytes = gobbleHeaderAndExtractBody(buffer);
-            if ( bytes.length > 0 )
-            {
-                DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
-                Message message = Message.serializer().deserialize(dis);                
-                if ( message != null )
-                {                                        
-                    MessagingService.receive(message);
-                }
-            }
-        }
-        catch ( IOException ioe )
-        {
-            logger_.warn(LogUtil.throwableToString(ioe));
-        }
-        finally
-        {
-            turnOnInterestOps(key_, SelectionKey.OP_READ );
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.net.SocketAddress;
+import java.nio.*;
+import java.nio.channels.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.net.io.ProtocolState;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class UdpConnection extends SelectionKeyHandler
+{
+    private static Logger logger_ = Logger.getLogger(UdpConnection.class);
+    private static final int BUFFER_SIZE = 4096;
+    private static final int protocol_ = 0xBADBEEF;
+    
+    private DatagramChannel socketChannel_;
+    private SelectionKey key_;    
+    private EndPoint localEndPoint_;
+    
+    public void init() throws IOException
+    {
+        socketChannel_ = DatagramChannel.open();
+        socketChannel_.socket().setReuseAddress(true);
+        socketChannel_.configureBlocking(false);        
+    }
+    
+    public void init(int port) throws IOException
+    {
+        // TODO: get TCP port from config and add one.
+        localEndPoint_ = new EndPoint(port);
+        socketChannel_ = DatagramChannel.open();
+        socketChannel_.socket().bind(localEndPoint_.getInetAddress());
+        socketChannel_.socket().setReuseAddress(true);
+        socketChannel_.configureBlocking(false);        
+        key_ = SelectorManager.getUdpSelectorManager().register(socketChannel_, this, SelectionKey.OP_READ);
+    }
+    
+    public boolean write(Message message, EndPoint to) throws IOException
+    {
+        boolean bVal = true;                       
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        Message.serializer().serialize(message, dos);        
+        byte[] data = bos.toByteArray();
+        if ( data.length > 0 )
+        {  
+            if (logger_.isTraceEnabled())
+                logger_.trace("Size of Gossip packet " + data.length);
+            byte[] protocol = BasicUtilities.intToByteArray(protocol_);
+            ByteBuffer buffer = ByteBuffer.allocate(data.length + protocol.length);
+            buffer.put( protocol );
+            buffer.put(data);
+            buffer.flip();
+            
+            int n  = socketChannel_.send(buffer, to.getInetAddress());
+            if ( n == 0 )
+            {
+                bVal = false;
+            }
+        }
+        return bVal;
+    }
+    
+    void close()
+    {
+        try
+        {
+            if ( socketChannel_ != null )
+                socketChannel_.close();
+        }
+        catch ( IOException ex )
+        {
+            logger_.error( LogUtil.throwableToString(ex) );
+        }
+    }
+    
+    public DatagramChannel getDatagramChannel()
+    {
+        return socketChannel_;
+    }
+    
+    private byte[] gobbleHeaderAndExtractBody(ByteBuffer buffer)
+    {
+        byte[] body = new byte[0];        
+        byte[] protocol = new byte[4];
+        buffer = buffer.get(protocol, 0, protocol.length);
+        int value = BasicUtilities.byteArrayToInt(protocol);
+        
+        if ( protocol_ != value )
+        {
+            logger_.info("Invalid protocol header in the incoming message " + value);
+            return body;
+        }
+        body = new byte[buffer.remaining()];
+        buffer.get(body, 0, body.length);       
+        return body;
+    }
+    
+    public void read(SelectionKey key)
+    {        
+        turnOffInterestOps(key, SelectionKey.OP_READ);
+        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+        try
+        {
+            SocketAddress sa = socketChannel_.receive(buffer);
+            if ( sa == null )
+            {
+                if (logger_.isDebugEnabled())
+                  logger_.debug("*** No datagram packet was available to be read ***");
+                return;
+            }            
+            buffer.flip();
+            
+            byte[] bytes = gobbleHeaderAndExtractBody(buffer);
+            if ( bytes.length > 0 )
+            {
+                DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+                Message message = Message.serializer().deserialize(dis);                
+                if ( message != null )
+                {                                        
+                    MessagingService.receive(message);
+                }
+            }
+        }
+        catch ( IOException ioe )
+        {
+            logger_.warn(LogUtil.throwableToString(ioe));
+        }
+        finally
+        {
+            turnOnInterestOps(key_, SelectionKey.OP_READ );
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentLengthState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentLengthState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentLengthState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentLengthState.java Thu Jul 30 15:30:21 2009
@@ -1,67 +1,67 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.io.IOException;
-
-import org.apache.cassandra.utils.FBUtilities;
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class ContentLengthState extends StartState
-{
-    private ByteBuffer buffer_;
-
-    ContentLengthState(TcpReader stream)
-    {
-        super(stream);
-        buffer_ = ByteBuffer.allocate(4);
-    }
-
-    public byte[] read() throws IOException, ReadNotCompleteException
-    {        
-        return doRead(buffer_);
-    }
-
-    public void morphState() throws IOException
-    {
-        int size = FBUtilities.byteArrayToInt(buffer_.array());        
-        StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT);
-        if ( nextState == null )
-        {
-            nextState = new ContentState(stream_, size);
-            stream_.putSocketState( TcpReader.TcpReaderState.CONTENT, nextState );
-        }
-        else
-        {               
-            nextState.setContextData(size);
-        }
-        stream_.morphState( nextState );
-        buffer_.clear();
-    }
-    
-    public void setContextData(Object data)
-    {
-        throw new UnsupportedOperationException("This method is not supported in the ContentLengthState");
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+
+import org.apache.cassandra.utils.FBUtilities;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ContentLengthState extends StartState
+{
+    private ByteBuffer buffer_;
+
+    ContentLengthState(TcpReader stream)
+    {
+        super(stream);
+        buffer_ = ByteBuffer.allocate(4);
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {        
+        return doRead(buffer_);
+    }
+
+    public void morphState() throws IOException
+    {
+        int size = FBUtilities.byteArrayToInt(buffer_.array());        
+        StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT);
+        if ( nextState == null )
+        {
+            nextState = new ContentState(stream_, size);
+            stream_.putSocketState( TcpReader.TcpReaderState.CONTENT, nextState );
+        }
+        else
+        {               
+            nextState.setContextData(size);
+        }
+        stream_.morphState( nextState );
+        buffer_.clear();
+    }
+    
+    public void setContextData(Object data)
+    {
+        throw new UnsupportedOperationException("This method is not supported in the ContentLengthState");
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentState.java Thu Jul 30 15:30:21 2009
@@ -1,84 +1,84 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.io.IOException;
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class ContentState extends StartState
-{
-    private ByteBuffer buffer_;   
-    private int length_;
-
-    ContentState(TcpReader stream, int length)
-    {
-        super(stream);
-        length_ = length; 
-        buffer_ = ByteBuffer.allocate(length_);
-    }
-
-    public byte[] read() throws IOException, ReadNotCompleteException
-    {          
-        return doRead(buffer_);
-    }
-
-    public void morphState() throws IOException
-    {        
-        StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.DONE);
-        if ( nextState == null )
-        {
-            nextState = new DoneState(stream_, toBytes());
-            stream_.putSocketState( TcpReader.TcpReaderState.DONE, nextState );
-        }
-        else
-        {            
-            nextState.setContextData(toBytes());
-        }
-        stream_.morphState( nextState );               
-    }
-    
-    private byte[] toBytes()
-    {
-        buffer_.position(0); 
-        /*
-        ByteBuffer slice = buffer_.slice();        
-        return slice.array();
-        */  
-        byte[] bytes = new byte[length_];
-        buffer_.get(bytes, 0, length_);
-        return bytes;
-    }
-    
-    public void setContextData(Object data)
-    {
-        Integer value = (Integer)data;
-        length_ = value;               
-        buffer_.clear();
-        if ( buffer_.capacity() < length_ )
-            buffer_ = ByteBuffer.allocate(length_);
-        else
-        {            
-            buffer_.limit(length_);
-        }        
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class ContentState extends StartState
+{
+    private ByteBuffer buffer_;   
+    private int length_;
+
+    ContentState(TcpReader stream, int length)
+    {
+        super(stream);
+        length_ = length; 
+        buffer_ = ByteBuffer.allocate(length_);
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {          
+        return doRead(buffer_);
+    }
+
+    public void morphState() throws IOException
+    {        
+        StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.DONE);
+        if ( nextState == null )
+        {
+            nextState = new DoneState(stream_, toBytes());
+            stream_.putSocketState( TcpReader.TcpReaderState.DONE, nextState );
+        }
+        else
+        {            
+            nextState.setContextData(toBytes());
+        }
+        stream_.morphState( nextState );               
+    }
+    
+    private byte[] toBytes()
+    {
+        buffer_.position(0); 
+        /*
+        ByteBuffer slice = buffer_.slice();        
+        return slice.array();
+        */  
+        byte[] bytes = new byte[length_];
+        buffer_.get(bytes, 0, length_);
+        return bytes;
+    }
+    
+    public void setContextData(Object data)
+    {
+        Integer value = (Integer)data;
+        length_ = value;               
+        buffer_.clear();
+        if ( buffer_.capacity() < length_ )
+            buffer_ = ByteBuffer.allocate(length_);
+        else
+        {            
+            buffer_.limit(length_);
+        }        
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ContentStreamState.java Thu Jul 30 15:30:21 2009
@@ -1,138 +1,138 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.SocketChannel;
-import java.io.*;
-
-import org.apache.cassandra.db.Table;
-import org.apache.log4j.Logger;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-
-class ContentStreamState extends StartState
-{       
-    private static Logger logger_ = Logger.getLogger(ContentStreamState.class);
-    private static long count_ = 64*1024*1024;
-    /* Return this byte array to exit event loop */
-    private static byte[] bytes_ = new byte[1];
-    private long bytesRead_ = 0L;
-    private FileChannel fc_;
-    private StreamContextManager.StreamContext streamContext_;
-    private StreamContextManager.StreamStatus streamStatus_;
-    
-    ContentStreamState(TcpReader stream)
-    {
-        super(stream); 
-        SocketChannel socketChannel = stream.getStream();
-        InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
-        String remoteHost = remoteAddress.getHostName();        
-        streamContext_ = StreamContextManager.getStreamContext(remoteHost);   
-        streamStatus_ = StreamContextManager.getStreamStatus(remoteHost);
-    }
-    
-    private void createFileChannel() throws IOException
-    {
-        if ( fc_ == null )
-        {
-            if (logger_.isDebugEnabled())
-              logger_.debug("Creating file for " + streamContext_.getTargetFile());
-            FileOutputStream fos = new FileOutputStream( streamContext_.getTargetFile(), true );
-            fc_ = fos.getChannel();            
-        }
-    }
-
-    public byte[] read() throws IOException, ReadNotCompleteException
-    {        
-        SocketChannel socketChannel = stream_.getStream();
-        InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
-        String remoteHost = remoteAddress.getHostName();  
-        createFileChannel();
-        if ( streamContext_ != null )
-        {  
-            try
-            {
-                bytesRead_ += fc_.transferFrom(socketChannel, bytesRead_, ContentStreamState.count_);
-                if ( bytesRead_ != streamContext_.getExpectedBytes() )
-                    throw new ReadNotCompleteException("Specified number of bytes have not been read from the Socket Channel");
-            }
-            catch ( IOException ex )
-            {
-                /* Ask the source node to re-stream this file. */
-                streamStatus_.setAction(StreamContextManager.StreamCompletionAction.STREAM);                
-                handleStreamCompletion(remoteHost);
-                /* Delete the orphaned file. */
-                File file = new File(streamContext_.getTargetFile());
-                file.delete();
-                throw ex;
-            }
-            if ( bytesRead_ == streamContext_.getExpectedBytes() )
-            {       
-                if (logger_.isDebugEnabled())
-                    logger_.debug("Removing stream context " + streamContext_);                 
-                handleStreamCompletion(remoteHost);                              
-                bytesRead_ = 0L;
-                fc_.close();
-                morphState();
-            }                            
-        }
-        
-        return new byte[0];
-    }
-    
-    private void handleStreamCompletion(String remoteHost) throws IOException
-    {
-        /* 
-         * Streaming is complete. If all the data that has to be received inform the sender via 
-         * the stream completion callback so that the source may perform the requisite cleanup. 
-        */
-        IStreamComplete streamComplete = StreamContextManager.getStreamCompletionHandler(remoteHost);
-        if ( streamComplete != null )
-        {                    
-            streamComplete.onStreamCompletion(remoteHost, streamContext_, streamStatus_);                    
-        }
-    }
-
-    public void morphState() throws IOException
-    {        
-        /* We instantiate an array of size 1 so that we can exit the event loop of the read. */                
-        StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.DONE);
-        if ( nextState == null )
-        {
-            nextState = new DoneState(stream_, ContentStreamState.bytes_);
-            stream_.putSocketState( TcpReader.TcpReaderState.DONE, nextState );
-        }
-        else
-        {
-            nextState.setContextData(ContentStreamState.bytes_);
-        }
-        stream_.morphState( nextState );  
-    }
-    
-    public void setContextData(Object data)
-    {
-        throw new UnsupportedOperationException("This method is not supported in the ContentStreamState");
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SocketChannel;
+import java.io.*;
+
+import org.apache.cassandra.db.Table;
+import org.apache.log4j.Logger;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+
+class ContentStreamState extends StartState
+{       
+    private static Logger logger_ = Logger.getLogger(ContentStreamState.class);
+    private static long count_ = 64*1024*1024;
+    /* Return this byte array to exit event loop */
+    private static byte[] bytes_ = new byte[1];
+    private long bytesRead_ = 0L;
+    private FileChannel fc_;
+    private StreamContextManager.StreamContext streamContext_;
+    private StreamContextManager.StreamStatus streamStatus_;
+    
+    ContentStreamState(TcpReader stream)
+    {
+        super(stream); 
+        SocketChannel socketChannel = stream.getStream();
+        InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+        String remoteHost = remoteAddress.getHostName();        
+        streamContext_ = StreamContextManager.getStreamContext(remoteHost);   
+        streamStatus_ = StreamContextManager.getStreamStatus(remoteHost);
+    }
+    
+    private void createFileChannel() throws IOException
+    {
+        if ( fc_ == null )
+        {
+            if (logger_.isDebugEnabled())
+              logger_.debug("Creating file for " + streamContext_.getTargetFile());
+            FileOutputStream fos = new FileOutputStream( streamContext_.getTargetFile(), true );
+            fc_ = fos.getChannel();            
+        }
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {        
+        SocketChannel socketChannel = stream_.getStream();
+        InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
+        String remoteHost = remoteAddress.getHostName();  
+        createFileChannel();
+        if ( streamContext_ != null )
+        {  
+            try
+            {
+                bytesRead_ += fc_.transferFrom(socketChannel, bytesRead_, ContentStreamState.count_);
+                if ( bytesRead_ != streamContext_.getExpectedBytes() )
+                    throw new ReadNotCompleteException("Specified number of bytes have not been read from the Socket Channel");
+            }
+            catch ( IOException ex )
+            {
+                /* Ask the source node to re-stream this file. */
+                streamStatus_.setAction(StreamContextManager.StreamCompletionAction.STREAM);                
+                handleStreamCompletion(remoteHost);
+                /* Delete the orphaned file. */
+                File file = new File(streamContext_.getTargetFile());
+                file.delete();
+                throw ex;
+            }
+            if ( bytesRead_ == streamContext_.getExpectedBytes() )
+            {       
+                if (logger_.isDebugEnabled())
+                    logger_.debug("Removing stream context " + streamContext_);                 
+                handleStreamCompletion(remoteHost);                              
+                bytesRead_ = 0L;
+                fc_.close();
+                morphState();
+            }                            
+        }
+        
+        return new byte[0];
+    }
+    
+    private void handleStreamCompletion(String remoteHost) throws IOException
+    {
+        /* 
+         * Streaming is complete. If all the data that has to be received inform the sender via 
+         * the stream completion callback so that the source may perform the requisite cleanup. 
+        */
+        IStreamComplete streamComplete = StreamContextManager.getStreamCompletionHandler(remoteHost);
+        if ( streamComplete != null )
+        {                    
+            streamComplete.onStreamCompletion(remoteHost, streamContext_, streamStatus_);                    
+        }
+    }
+
+    public void morphState() throws IOException
+    {        
+        /* We instantiate an array of size 1 so that we can exit the event loop of the read. */                
+        StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.DONE);
+        if ( nextState == null )
+        {
+            nextState = new DoneState(stream_, ContentStreamState.bytes_);
+            stream_.putSocketState( TcpReader.TcpReaderState.DONE, nextState );
+        }
+        else
+        {
+            nextState.setContextData(ContentStreamState.bytes_);
+        }
+        stream_.morphState( nextState );  
+    }
+    
+    public void setContextData(Object data)
+    {
+        throw new UnsupportedOperationException("This method is not supported in the ContentStreamState");
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/DoneState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/DoneState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/DoneState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/DoneState.java Thu Jul 30 15:30:21 2009
@@ -1,52 +1,52 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.nio.channels.SocketChannel;
-import java.io.IOException;
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-class DoneState extends StartState
-{
-    private byte[] bytes_ = new byte[0];
-
-    DoneState(TcpReader stream, byte[] bytes)
-    {
-        super(stream);
-        bytes_ = bytes;
-    }
-
-    public byte[] read() throws IOException, ReadNotCompleteException
-    {        
-        morphState();
-        return bytes_;
-    }
-
-    public void morphState() throws IOException
-    {                       
-        stream_.morphState(null);
-    }
-    
-    public void setContextData(Object data)
-    {                
-        bytes_ = (byte[])data;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class DoneState extends StartState
+{
+    private byte[] bytes_ = new byte[0];
+
+    DoneState(TcpReader stream, byte[] bytes)
+    {
+        super(stream);
+        bytes_ = bytes;
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {        
+        morphState();
+        return bytes_;
+    }
+
+    public void morphState() throws IOException
+    {                       
+        stream_.morphState(null);
+    }
+    
+    public void setContextData(Object data)
+    {                
+        bytes_ = (byte[])data;
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/FastSerializer.java Thu Jul 30 15:30:21 2009
@@ -1,46 +1,46 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.io.IOException;
-
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.net.Message;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class FastSerializer implements ISerializer
-{ 
-    public byte[] serialize(Message message) throws IOException
-    {
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        Message.serializer().serialize(message, buffer);
-        return buffer.getData();
-    }
-    
-    public Message deserialize(byte[] bytes) throws IOException
-    {
-        DataInputBuffer bufIn = new DataInputBuffer();
-        bufIn.reset(bytes, bytes.length);
-        return Message.serializer().deserialize(bufIn);
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.net.Message;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class FastSerializer implements ISerializer
+{ 
+    public byte[] serialize(Message message) throws IOException
+    {
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        Message.serializer().serialize(message, buffer);
+        return buffer.getData();
+    }
+    
+    public Message deserialize(byte[] bytes) throws IOException
+    {
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(bytes, bytes.length);
+        return Message.serializer().deserialize(bufIn);
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ISerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ISerializer.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ISerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ISerializer.java Thu Jul 30 15:30:21 2009
@@ -1,32 +1,32 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.io.IOException;
-
-import org.apache.cassandra.net.Message;
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public interface ISerializer
-{
-    public byte[] serialize(Message message) throws IOException;
-    public Message deserialize(byte[] bytes) throws IOException;
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.net.Message;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface ISerializer
+{
+    public byte[] serialize(Message message) throws IOException;
+    public Message deserialize(byte[] bytes) throws IOException;
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/IStreamComplete.java Thu Jul 30 15:30:21 2009
@@ -1,36 +1,36 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.io.IOException;
-
-import org.apache.cassandra.net.EndPoint;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public interface IStreamComplete
-{
-    /*
-     * This callback if registered with the StreamContextManager is 
-     * called when the stream from a host is completely handled. 
-    */
-    public void onStreamCompletion(String from, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException;
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.io.IOException;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IStreamComplete
+{
+    /*
+     * This callback if registered with the StreamContextManager is 
+     * called when the stream from a host is completely handled. 
+    */
+    public void onStreamCompletion(String from, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException;
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolHeaderState.java Thu Jul 30 15:30:21 2009
@@ -1,103 +1,103 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import org.apache.cassandra.utils.*;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.io.IOException;
-import org.apache.cassandra.net.MessagingService;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class ProtocolHeaderState extends StartState
-{
-    private ByteBuffer buffer_;
-
-    public ProtocolHeaderState(TcpReader stream)
-    {
-        super(stream);
-        buffer_ = ByteBuffer.allocate(4);
-    }
-
-    public byte[] read() throws IOException, ReadNotCompleteException
-    {        
-        return doRead(buffer_);
-    }
-
-    public void morphState() throws IOException
-    {
-        byte[] protocolHeader = buffer_.array();
-        int pH = MessagingService.byteArrayToInt(protocolHeader);
-        
-        int type = MessagingService.getBits(pH, 1, 2);
-        stream_.getProtocolHeader().serializerType_ = type;
-        
-        int stream = MessagingService.getBits(pH, 3, 1);
-        stream_.getProtocolHeader().isStreamingMode_ = (stream == 1) ? true : false;
-        
-        if ( stream_.getProtocolHeader().isStreamingMode_ )
-            MessagingService.setStreamingMode(true);
-        
-        int listening = MessagingService.getBits(pH, 4, 1);
-        stream_.getProtocolHeader().isListening_ = (listening == 1) ? true : false;
-        
-        int version = MessagingService.getBits(pH, 15, 8);
-        stream_.getProtocolHeader().version_ = version;
-        
-        if ( version <= MessagingService.getVersion() )
-        {
-            if ( stream_.getProtocolHeader().isStreamingMode_ )
-            { 
-                StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT_STREAM);
-                if ( nextState == null )
-                {
-                    nextState = new ContentStreamState(stream_);
-                    stream_.putSocketState( TcpReader.TcpReaderState.CONTENT_STREAM, nextState );
-                }
-                stream_.morphState( nextState );
-                buffer_.clear();
-            }
-            else
-            {
-                StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT_LENGTH);
-                if ( nextState == null )
-                {
-                    nextState = new ContentLengthState(stream_);
-                    stream_.putSocketState( TcpReader.TcpReaderState.CONTENT_LENGTH, nextState );
-                }                
-                stream_.morphState( nextState );   
-                buffer_.clear();
-            }            
-        }
-        else
-        {
-            throw new IOException("Invalid version in message. Scram.");
-        }
-    }
-    
-    public void setContextData(Object data)
-    {
-        throw new UnsupportedOperationException("This method is not supported in the ProtocolHeaderState");
-    }
-}
-
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import org.apache.cassandra.utils.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ProtocolHeaderState extends StartState
+{
+    private ByteBuffer buffer_;
+
+    public ProtocolHeaderState(TcpReader stream)
+    {
+        super(stream);
+        buffer_ = ByteBuffer.allocate(4);
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {        
+        return doRead(buffer_);
+    }
+
+    public void morphState() throws IOException
+    {
+        byte[] protocolHeader = buffer_.array();
+        int pH = MessagingService.byteArrayToInt(protocolHeader);
+        
+        int type = MessagingService.getBits(pH, 1, 2);
+        stream_.getProtocolHeader().serializerType_ = type;
+        
+        int stream = MessagingService.getBits(pH, 3, 1);
+        stream_.getProtocolHeader().isStreamingMode_ = (stream == 1) ? true : false;
+        
+        if ( stream_.getProtocolHeader().isStreamingMode_ )
+            MessagingService.setStreamingMode(true);
+        
+        int listening = MessagingService.getBits(pH, 4, 1);
+        stream_.getProtocolHeader().isListening_ = (listening == 1) ? true : false;
+        
+        int version = MessagingService.getBits(pH, 15, 8);
+        stream_.getProtocolHeader().version_ = version;
+        
+        if ( version <= MessagingService.getVersion() )
+        {
+            if ( stream_.getProtocolHeader().isStreamingMode_ )
+            { 
+                StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT_STREAM);
+                if ( nextState == null )
+                {
+                    nextState = new ContentStreamState(stream_);
+                    stream_.putSocketState( TcpReader.TcpReaderState.CONTENT_STREAM, nextState );
+                }
+                stream_.morphState( nextState );
+                buffer_.clear();
+            }
+            else
+            {
+                StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.CONTENT_LENGTH);
+                if ( nextState == null )
+                {
+                    nextState = new ContentLengthState(stream_);
+                    stream_.putSocketState( TcpReader.TcpReaderState.CONTENT_LENGTH, nextState );
+                }                
+                stream_.morphState( nextState );   
+                buffer_.clear();
+            }            
+        }
+        else
+        {
+            throw new IOException("Invalid version in message. Scram.");
+        }
+    }
+    
+    public void setContextData(Object data)
+    {
+        throw new UnsupportedOperationException("This method is not supported in the ProtocolHeaderState");
+    }
+}
+
+

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ProtocolState.java Thu Jul 30 15:30:21 2009
@@ -1,71 +1,71 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import org.apache.cassandra.utils.*;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.io.IOException;
-import org.apache.cassandra.net.MessagingService;
-
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class ProtocolState extends StartState
-{
-    private ByteBuffer buffer_;
-
-    public ProtocolState(TcpReader stream)
-    {
-        super(stream);
-        buffer_ = ByteBuffer.allocate(16);
-    }
-
-    public byte[] read() throws IOException, ReadNotCompleteException
-    {        
-        return doRead(buffer_);
-    }
-
-    public void morphState() throws IOException
-    {
-        byte[] protocol = buffer_.array();
-        if ( MessagingService.isProtocolValid(protocol) )
-        {            
-            StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.PROTOCOL);
-            if ( nextState == null )
-            {
-                nextState = new ProtocolHeaderState(stream_);
-                stream_.putSocketState( TcpReader.TcpReaderState.PROTOCOL, nextState );
-            }
-            stream_.morphState( nextState ); 
-            buffer_.clear();
-        }
-        else
-        {
-            throw new IOException("Invalid protocol header. The preamble seems to be messed up.");
-        }
-    }
-    
-    public void setContextData(Object data)
-    {
-        throw new UnsupportedOperationException("This method is not supported in the ProtocolState");
-    }
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import org.apache.cassandra.utils.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.io.IOException;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ProtocolState extends StartState
+{
+    private ByteBuffer buffer_;
+
+    public ProtocolState(TcpReader stream)
+    {
+        super(stream);
+        buffer_ = ByteBuffer.allocate(16);
+    }
+
+    public byte[] read() throws IOException, ReadNotCompleteException
+    {        
+        return doRead(buffer_);
+    }
+
+    public void morphState() throws IOException
+    {
+        byte[] protocol = buffer_.array();
+        if ( MessagingService.isProtocolValid(protocol) )
+        {            
+            StartState nextState = stream_.getSocketState(TcpReader.TcpReaderState.PROTOCOL);
+            if ( nextState == null )
+            {
+                nextState = new ProtocolHeaderState(stream_);
+                stream_.putSocketState( TcpReader.TcpReaderState.PROTOCOL, nextState );
+            }
+            stream_.morphState( nextState ); 
+            buffer_.clear();
+        }
+        else
+        {
+            throw new IOException("Invalid protocol header. The preamble seems to be messed up.");
+        }
+    }
+    
+    public void setContextData(Object data)
+    {
+        throw new UnsupportedOperationException("This method is not supported in the ProtocolState");
+    }
+}
+

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ReadNotCompleteException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ReadNotCompleteException.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ReadNotCompleteException.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/ReadNotCompleteException.java Thu Jul 30 15:30:21 2009
@@ -1,34 +1,34 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-/**
- * Created by IntelliJ IDEA.
- * User: lakshman
- * Date: Aug 22, 2005
- * Time: 11:37:31 AM
- * To change this template use File | Settings | File Templates.
- */
-public class ReadNotCompleteException extends Exception
-{
-    ReadNotCompleteException(String message)
-    {
-        super(message);
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: lakshman
+ * Date: Aug 22, 2005
+ * Time: 11:37:31 AM
+ * To change this template use File | Settings | File Templates.
+ */
+public class ReadNotCompleteException extends Exception
+{
+    ReadNotCompleteException(String message)
+    {
+        super(message);
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerAttribute.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerAttribute.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerAttribute.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerAttribute.java Thu Jul 30 15:30:21 2009
@@ -1,27 +1,27 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.lang.annotation.*;
-
-@Retention(RetentionPolicy.RUNTIME)
-public @interface SerializerAttribute
-{
-    SerializerType value();
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.lang.annotation.*;
+
+@Retention(RetentionPolicy.RUNTIME)
+public @interface SerializerAttribute
+{
+    SerializerType value();
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerType.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerType.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/SerializerType.java Thu Jul 30 15:30:21 2009
@@ -1,27 +1,27 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-public enum SerializerType
-{
-    BINARY,
-    JAVA,
-    XML,
-    JSON
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+public enum SerializerType
+{
+    BINARY,
+    JAVA,
+    XML,
+    JSON
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StartState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StartState.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StartState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StartState.java Thu Jul 30 15:30:21 2009
@@ -1,59 +1,59 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.net.io;
-
-import java.nio.channels.SocketChannel;
-import java.nio.ByteBuffer;
-import java.io.IOException;
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public abstract class StartState
-{
-    protected TcpReader stream_;
-
-    public StartState(TcpReader stream)
-    {
-        stream_ = stream;
-    }
-
-    public abstract byte[] read() throws IOException, ReadNotCompleteException;
-    public abstract void morphState() throws IOException;
-    public abstract void setContextData(Object data);
-
-    protected byte[] doRead(ByteBuffer buffer) throws IOException, ReadNotCompleteException
-    {        
-        SocketChannel socketChannel = stream_.getStream();
-        int bytesRead = socketChannel.read(buffer);     
-        if ( bytesRead == -1 && buffer.remaining() > 0 )
-        {            
-            throw new IOException("Reached an EOL or something bizzare occured. Reading from: " + socketChannel.socket().getInetAddress() + " BufferSizeRemaining: " + buffer.remaining());
-        }
-        if ( buffer.remaining() == 0 )
-        {
-            morphState();
-        }
-        else
-        {            
-            throw new ReadNotCompleteException("Specified number of bytes have not been read from the Socket Channel");
-        }
-        return new byte[0];
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.io;
+
+import java.nio.channels.SocketChannel;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public abstract class StartState
+{
+    protected TcpReader stream_;
+
+    public StartState(TcpReader stream)
+    {
+        stream_ = stream;
+    }
+
+    public abstract byte[] read() throws IOException, ReadNotCompleteException;
+    public abstract void morphState() throws IOException;
+    public abstract void setContextData(Object data);
+
+    protected byte[] doRead(ByteBuffer buffer) throws IOException, ReadNotCompleteException
+    {        
+        SocketChannel socketChannel = stream_.getStream();
+        int bytesRead = socketChannel.read(buffer);     
+        if ( bytesRead == -1 && buffer.remaining() > 0 )
+        {            
+            throw new IOException("Reached an EOL or something bizzare occured. Reading from: " + socketChannel.socket().getInetAddress() + " BufferSizeRemaining: " + buffer.remaining());
+        }
+        if ( buffer.remaining() == 0 )
+        {
+            morphState();
+        }
+        else
+        {            
+            throw new ReadNotCompleteException("Specified number of bytes have not been read from the Socket Channel");
+        }
+        return new byte[0];
+    }
+}