You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:12:49 UTC

svn commit: r749205 [14/16] - in /incubator/cassandra/src/org/apache/cassandra: analytics/ cli/ concurrent/ config/ continuations/ cql/ cql/common/ cql/compiler/ cql/compiler/common/ cql/compiler/parse/ cql/compiler/sem/ cql/driver/ cql/execution/ dht/...

Added: incubator/cassandra/src/org/apache/cassandra/io/DataOutputBuffer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/DataOutputBuffer.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/DataOutputBuffer.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/DataOutputBuffer.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+
+import org.apache.cassandra.continuations.Suspendable;
+
+
+/**
+ * An implementation of the DataOutputStream interface. This class is completely thread
+ * unsafe.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class DataOutputBuffer extends DataOutputStream
+{
+    /*
+     * This is a clone of the ByteArrayOutputStream but w/o the unnecessary
+     * synchronized keyword usage.
+    */
+    public static class FastByteArrayOutputStream extends OutputStream
+    {
+        
+        /**
+         * The buffer where data is stored.
+         */
+        protected byte buf[];
+        
+        /**
+         * The number of valid bytes in the buffer.
+         */
+        protected int count;
+        
+        /**
+         * Creates a new byte array output stream. The buffer capacity is
+         * initially 32 bytes, though its size increases if necessary.
+         */
+        public FastByteArrayOutputStream()
+        {
+            this(32);
+        }
+        
+        /**
+         * Creates a new byte array output stream, with a buffer capacity of the
+         * specified size, in bytes.
+         * 
+         * @param size
+         *            the initial size.
+         * @exception IllegalArgumentException
+         *                if size is negative.
+         */
+        public FastByteArrayOutputStream(int size)
+        {
+            if (size < 0)
+            {
+                throw new IllegalArgumentException("Negative initial size: "
+                        + size);
+            }
+            buf = new byte[size];
+        }
+        
+        /**
+         * Writes the specified byte to this byte array output stream.
+         * 
+         * @param b
+         *            the byte to be written.
+         */
+        public void write(int b)
+        {
+            int newcount = count + 1;
+            if (newcount > buf.length)
+            {
+                buf = Arrays.copyOf(buf, Math.max(buf.length << 1, newcount));
+            }
+            buf[count] = (byte) b;
+            count = newcount;
+        }
+        
+        /**
+         * Writes <code>len</code> bytes from the specified byte array
+         * starting at offset <code>off</code> to this byte array output
+         * stream.
+         * 
+         * @param b
+         *            the data.
+         * @param off
+         *            the start offset in the data.
+         * @param len
+         *            the number of bytes to write.
+         */
+        public void write(byte b[], int off, int len)
+        {
+            if ((off < 0) || (off > b.length) || (len < 0)
+                    || ((off + len) > b.length) || ((off + len) < 0))
+            {
+                throw new IndexOutOfBoundsException();
+            }
+            else if (len == 0)
+            {
+                return;
+            }
+            int newcount = count + len;
+            if (newcount > buf.length)
+            {
+                buf = Arrays.copyOf(buf, Math.max(buf.length << 1, newcount));
+            }
+            System.arraycopy(b, off, buf, count, len);
+            count = newcount;
+        }
+        
+        /**
+         * Writes the complete contents of this byte array output stream to the
+         * specified output stream argument, as if by calling the output
+         * stream's write method using <code>out.write(buf, 0, count)</code>.
+         * 
+         * @param out
+         *            the output stream to which to write the data.
+         * @exception IOException
+         *                if an I/O error occurs.
+         */
+        public void writeTo(OutputStream out) throws IOException
+        {
+            out.write(buf, 0, count);
+        }
+        
+        /**
+         * Resets the <code>count</code> field of this byte array output
+         * stream to zero, so that all currently accumulated output in the
+         * output stream is discarded. The output stream can be used again,
+         * reusing the already allocated buffer space.
+         * 
+         * @see java.io.ByteArrayInputStream#count
+         */
+        public void reset()
+        {
+            count = 0;
+        }
+        
+        /**
+         * Creates a newly allocated byte array. Its size is the current size of
+         * this output stream and the valid contents of the buffer have been
+         * copied into it.
+         * 
+         * @return the current contents of this output stream, as a byte array.
+         * @see java.io.ByteArrayOutputStream#size()
+         */
+        public byte toByteArray()[]
+        {
+            return Arrays.copyOf(buf, count);
+        }
+        
+        /**
+         * Returns the current size of the buffer.
+         * 
+         * @return the value of the <code>count</code> field, which is the
+         *         number of valid bytes in this output stream.
+         * @see java.io.ByteArrayOutputStream#count
+         */
+        public int size()
+        {
+            return count;
+        }
+        
+        /**
+         * Converts the buffer's contents into a string decoding bytes using the
+         * platform's default character set. The length of the new
+         * <tt>String</tt> is a function of the character set, and hence may
+         * not be equal to the size of the buffer.
+         * 
+         * <p>
+         * This method always replaces malformed-input and unmappable-character
+         * sequences with the default replacement string for the platform's
+         * default character set. The
+         * {@linkplain java.nio.charset.CharsetDecoder} class should be used
+         * when more control over the decoding process is required.
+         * 
+         * @return String decoded from the buffer's contents.
+         * @since JDK1.1
+         */
+        public String toString()
+        {
+            return new String(buf, 0, count);
+        }
+        
+        /**
+         * Converts the buffer's contents into a string by decoding the bytes
+         * using the specified {@link java.nio.charset.Charset charsetName}.
+         * The length of the new <tt>String</tt> is a function of the charset,
+         * and hence may not be equal to the length of the byte array.
+         * 
+         * <p>
+         * This method always replaces malformed-input and unmappable-character
+         * sequences with this charset's default replacement string. The {@link
+         * java.nio.charset.CharsetDecoder} class should be used when more
+         * control over the decoding process is required.
+         * 
+         * @param charsetName
+         *            the name of a supported
+         *            {@linkplain java.nio.charset.Charset </code>charset<code>}
+         * @return String decoded from the buffer's contents.
+         * @exception UnsupportedEncodingException
+         *                If the named charset is not supported
+         * @since JDK1.1
+         */
+        public String toString(String charsetName) throws UnsupportedEncodingException
+        {
+            return new String(buf, 0, count, charsetName);
+        }
+        
+        /**
+         * Creates a newly allocated string. Its size is the current size of the
+         * output stream and the valid contents of the buffer have been copied
+         * into it. Each character <i>c</i> in the resulting string is
+         * constructed from the corresponding element <i>b</i> in the byte
+         * array such that: <blockquote>
+         * 
+         * <pre>
+         * c == (char) (((hibyte &amp; 0xff) &lt;&lt; 8) | (b &amp; 0xff))
+         * </pre>
+         * 
+         * </blockquote>
+         * 
+         * @deprecated This method does not properly convert bytes into
+         *             characters. As of JDK&nbsp;1.1, the preferred way to do
+         *             this is via the <code>toString(String enc)</code>
+         *             method, which takes an encoding-name argument, or the
+         *             <code>toString()</code> method, which uses the
+         *             platform's default character encoding.
+         * 
+         * @param hibyte
+         *            the high byte of each resulting Unicode character.
+         * @return the current contents of the output stream, as a string.
+         * @see java.io.ByteArrayOutputStream#size()
+         * @see java.io.ByteArrayOutputStream#toString(String)
+         * @see java.io.ByteArrayOutputStream#toString()
+         */
+        @Deprecated
+        public String toString(int hibyte)
+        {
+            return new String(buf, hibyte, 0, count);
+        }
+        
+        /**
+         * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods
+         * in this class can be called after the stream has been closed without
+         * generating an <tt>IOException</tt>.
+         * <p>
+         * 
+         */
+        public void close() throws IOException
+        {
+        }
+        
+    }
+      
+    private static class Buffer extends FastByteArrayOutputStream
+    {
+        public byte[] getData()
+        {
+            return buf;
+        }
+        
+        public int getLength()
+        {
+            return count;
+        }
+        
+        public void reset()
+        {
+            count = 0;
+        }
+        
+        public void write(DataInput in, int len) throws IOException
+        {
+            int newcount = count + len;
+            if (newcount > buf.length)
+            {
+                byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+                System.arraycopy(buf, 0, newbuf, 0, count);
+                buf = newbuf;
+            }
+            long start = System.currentTimeMillis();
+            in.readFully(buf, count, len);
+            count = newcount;
+        }
+        
+        public void write(ByteBuffer buffer, int len) throws IOException
+        {
+            int newcount = count + len;
+            if (newcount > buf.length)
+            {
+                byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+                System.arraycopy(buf, 0, newbuf, 0, count);
+                buf = newbuf;
+            }
+            long start = System.currentTimeMillis();
+            buffer.get(buf, count, len);            
+            count = newcount;
+        }
+    }
+    
+    private Buffer buffer;
+    
+    /** Constructs a new empty buffer. */
+    public DataOutputBuffer()
+    {
+        this(new Buffer());
+    }
+    
+    private DataOutputBuffer(Buffer buffer)
+    {
+        super(buffer);
+        this.buffer = buffer;
+    }
+    
+    /**
+     * Returns the current contents of the buffer. Data is only valid to
+     * {@link #getLength()}.
+     */
+    public byte[] getData()
+    {
+        return buffer.getData();
+    }
+    
+    /** Returns the length of the valid data currently in the buffer. */
+    public int getLength()
+    {
+        return buffer.getLength();
+    }
+    
+    /** Resets the buffer to empty. */
+    public DataOutputBuffer reset()
+    {
+        this.written = 0;
+        buffer.reset();
+        return this;
+    }
+    
+    /** Writes bytes from a DataInput directly into the buffer. */
+    public void write(DataInput in, int length) throws IOException
+    {
+        buffer.write(in, length);
+    }   
+    
+    /** Writes bytes from a ByteBuffer directly into the buffer. */
+    public void write(ByteBuffer in, int length) throws IOException
+    {
+        buffer.write(in, length);
+    }   
+}

Added: incubator/cassandra/src/org/apache/cassandra/io/FastBufferedInputStream.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/FastBufferedInputStream.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/FastBufferedInputStream.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/FastBufferedInputStream.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * A <code>BufferedInputStream</code> adds functionality to another input
+ * stream-namely, the ability to buffer the input and to support the
+ * <code>mark</code> and <code>reset</code> methods. When the
+ * <code>BufferedInputStream</code> is created, an internal buffer array is
+ * created. As bytes from the stream are read or skipped, the internal buffer is
+ * refilled as necessary from the contained input stream, many bytes at a time.
+ * The <code>mark</code> operation remembers a point in the input stream and
+ * the <code>reset</code> operation causes all the bytes read since the most
+ * recent <code>mark</code> operation to be reread before new bytes are taken
+ * from the contained input stream.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class FastBufferedInputStream extends FilterInputStream
+{
+    
+    private static int defaultBufferSize = 8192;
+    
+    /**
+     * The internal buffer array where the data is stored. When necessary, it
+     * may be replaced by another array of a different size.
+     */
+    protected volatile byte buf[];
+    
+    /**
+     * Atomic updater to provide compareAndSet for buf. This is necessary
+     * because closes can be asynchronous. We use nullness of buf[] as primary
+     * indicator that this stream is closed. (The "in" field is also nulled out
+     * on close.)
+     */
+    private static final AtomicReferenceFieldUpdater<FastBufferedInputStream, byte[]> bufUpdater = AtomicReferenceFieldUpdater
+    .newUpdater(FastBufferedInputStream.class, byte[].class, "buf");
+    
+    /**
+     * The index one greater than the index of the last valid byte in the
+     * buffer. This value is always in the range <code>0</code> through
+     * <code>buf.length</code>; elements <code>buf[0]</code> through
+     * <code>buf[count-1]
+     * </code>contain buffered input data obtained from the
+     * underlying input stream.
+     */
+    protected int count;
+    
+    /**
+     * The current position in the buffer. This is the index of the next
+     * character to be read from the <code>buf</code> array.
+     * <p>
+     * This value is always in the range <code>0</code> through
+     * <code>count</code>. If it is less than <code>count</code>, then
+     * <code>buf[pos]</code> is the next byte to be supplied as input; if it
+     * is equal to <code>count</code>, then the next <code>read</code> or
+     * <code>skip</code> operation will require more bytes to be read from the
+     * contained input stream.
+     * 
+     * @see java.io.BufferedInputStream#buf
+     */
+    protected int pos;
+    
+    /**
+     * The value of the <code>pos</code> field at the time the last
+     * <code>mark</code> method was called.
+     * <p>
+     * This value is always in the range <code>-1</code> through
+     * <code>pos</code>. If there is no marked position in the input stream,
+     * this field is <code>-1</code>. If there is a marked position in the
+     * input stream, then <code>buf[markpos]</code> is the first byte to be
+     * supplied as input after a <code>reset</code> operation. If
+     * <code>markpos</code> is not <code>-1</code>, then all bytes from
+     * positions <code>buf[markpos]</code> through <code>buf[pos-1]</code>
+     * must remain in the buffer array (though they may be moved to another
+     * place in the buffer array, with suitable adjustments to the values of
+     * <code>count</code>, <code>pos</code>, and <code>markpos</code>);
+     * they may not be discarded unless and until the difference between
+     * <code>pos</code> and <code>markpos</code> exceeds
+     * <code>marklimit</code>.
+     * 
+     * @see java.io.BufferedInputStream#mark(int)
+     * @see java.io.BufferedInputStream#pos
+     */
+    protected int markpos = -1;
+    
+    /**
+     * The maximum read ahead allowed after a call to the <code>mark</code>
+     * method before subsequent calls to the <code>reset</code> method fail.
+     * Whenever the difference between <code>pos</code> and
+     * <code>markpos</code> exceeds <code>marklimit</code>, then the mark
+     * may be dropped by setting <code>markpos</code> to <code>-1</code>.
+     * 
+     * @see java.io.BufferedInputStream#mark(int)
+     * @see java.io.BufferedInputStream#reset()
+     */
+    protected int marklimit;
+    
+    /**
+     * Check to make sure that underlying input stream has not been nulled out
+     * due to close; if not return it;
+     */
+    private InputStream getInIfOpen() throws IOException
+    {
+        InputStream input = in;
+        if (input == null)
+            throw new IOException("Stream closed");
+        return input;
+    }
+    
+    /**
+     * Check to make sure that buffer has not been nulled out due to close; if
+     * not return it;
+     */
+    private byte[] getBufIfOpen() throws IOException
+    {
+        byte[] buffer = buf;
+        if (buffer == null)
+            throw new IOException("Stream closed");
+        return buffer;
+    }
+    
+    /**
+     * Creates a <code>BufferedInputStream</code> and saves its argument, the
+     * input stream <code>in</code>, for later use. An internal buffer array
+     * is created and stored in <code>buf</code>.
+     * 
+     * @param in
+     *            the underlying input stream.
+     */
+    public FastBufferedInputStream(InputStream in)
+    {
+        this(in, defaultBufferSize);
+    }
+    
+    /**
+     * Creates a <code>BufferedInputStream</code> with the specified buffer
+     * size, and saves its argument, the input stream <code>in</code>, for
+     * later use. An internal buffer array of length <code>size</code> is
+     * created and stored in <code>buf</code>.
+     * 
+     * @param in
+     *            the underlying input stream.
+     * @param size
+     *            the buffer size.
+     * @exception IllegalArgumentException
+     *                if size <= 0.
+     */
+    public FastBufferedInputStream(InputStream in, int size)
+    {
+        super(in);
+        if (size <= 0)
+        {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        buf = new byte[size];
+    }
+    
+    /**
+     * Fills the buffer with more data, taking into account shuffling and other
+     * tricks for dealing with marks. Assumes that it is being called by a
+     * synchronized method. This method also assumes that all data has already
+     * been read in, hence pos > count.
+     */
+    private void fill() throws IOException
+    {
+        byte[] buffer = getBufIfOpen();
+        if (markpos < 0)
+            pos = 0; /* no mark: throw away the buffer */
+        else if (pos >= buffer.length) /* no room left in buffer */
+            if (markpos > 0)
+            { /* can throw away early part of the buffer */
+                int sz = pos - markpos;
+                System.arraycopy(buffer, markpos, buffer, 0, sz);
+                pos = sz;
+                markpos = 0;
+            }
+            else if (buffer.length >= marklimit)
+            {
+                markpos = -1; /* buffer got too big, invalidate mark */
+                pos = 0; /* drop buffer contents */
+            }
+            else
+            { /* grow buffer */
+                int nsz = pos * 2;
+                if (nsz > marklimit)
+                    nsz = marklimit;
+                byte nbuf[] = new byte[nsz];
+                System.arraycopy(buffer, 0, nbuf, 0, pos);
+                if (!bufUpdater.compareAndSet(this, buffer, nbuf))
+                {
+                    // Can't replace buf if there was an async close.
+                    // Note: This would need to be changed if fill()
+                    // is ever made accessible to multiple threads.
+                    // But for now, the only way CAS can fail is via close.
+                    // assert buf == null;
+                    throw new IOException("Stream closed");
+                }
+                buffer = nbuf;
+            }
+        count = pos;
+        int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
+        if (n > 0)
+            count = n + pos;
+    }
+    
+    /**
+     * See the general contract of the <code>read</code> method of
+     * <code>InputStream</code>.
+     * 
+     * @return the next byte of data, or <code>-1</code> if the end of the
+     *         stream is reached.
+     * @exception IOException
+     *                if this input stream has been closed by invoking its
+     *                {@link #close()} method, or an I/O error occurs.
+     * @see java.io.FilterInputStream#in
+     */
+    public  int read() throws IOException
+    {
+        if (pos >= count)
+        {
+            fill();
+            if (pos >= count)
+                return -1;
+        }
+        return getBufIfOpen()[pos++] & 0xff;
+    }
+    
+    /**
+     * Read characters into a portion of an array, reading from the underlying
+     * stream at most once if necessary.
+     */
+    private int read1(byte[] b, int off, int len) throws IOException
+    {
+        int avail = count - pos;
+        if (avail <= 0)
+        {
+            /*
+             * If the requested length is at least as large as the buffer, and
+             * if there is no mark/reset activity, do not bother to copy the
+             * bytes into the local buffer. In this way buffered streams will
+             * cascade harmlessly.
+             */
+            if (len >= getBufIfOpen().length && markpos < 0)
+            {
+                return getInIfOpen().read(b, off, len);
+            }
+            fill();
+            avail = count - pos;
+            if (avail <= 0)
+                return -1;
+        }
+        int cnt = (avail < len) ? avail : len;
+        System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
+        pos += cnt;
+        return cnt;
+    }
+    
+    /**
+     * Reads bytes from this byte-input stream into the specified byte array,
+     * starting at the given offset.
+     * 
+     * <p>
+     * This method implements the general contract of the corresponding
+     * <code>{@link InputStream#read(byte[], int, int) read}</code> method of
+     * the <code>{@link InputStream}</code> class. As an additional
+     * convenience, it attempts to read as many bytes as possible by repeatedly
+     * invoking the <code>read</code> method of the underlying stream. This
+     * iterated <code>read</code> continues until one of the following
+     * conditions becomes true:
+     * <ul>
+     * 
+     * <li> The specified number of bytes have been read,
+     * 
+     * <li> The <code>read</code> method of the underlying stream returns
+     * <code>-1</code>, indicating end-of-file, or
+     * 
+     * <li> The <code>available</code> method of the underlying stream returns
+     * zero, indicating that further input requests would block.
+     * 
+     * </ul>
+     * If the first <code>read</code> on the underlying stream returns
+     * <code>-1</code> to indicate end-of-file then this method returns
+     * <code>-1</code>. Otherwise this method returns the number of bytes
+     * actually read.
+     * 
+     * <p>
+     * Subclasses of this class are encouraged, but not required, to attempt to
+     * read as many bytes as possible in the same fashion.
+     * 
+     * @param b
+     *            destination buffer.
+     * @param off
+     *            offset at which to start storing bytes.
+     * @param len
+     *            maximum number of bytes to read.
+     * @return the number of bytes read, or <code>-1</code> if the end of the
+     *         stream has been reached.
+     * @exception IOException
+     *                if this input stream has been closed by invoking its
+     *                {@link #close()} method, or an I/O error occurs.
+     */
+    public  int read(byte b[], int off, int len) throws IOException
+    {
+        getBufIfOpen(); // Check for closed stream
+        if ((off | len | (off + len) | (b.length - (off + len))) < 0)
+        {
+            throw new IndexOutOfBoundsException();
+        }
+        else if (len == 0)
+        {
+            return 0;
+        }
+        
+        int n = 0;
+        for (;;)
+        {
+            int nread = read1(b, off + n, len - n);
+            if (nread <= 0)
+                return (n == 0) ? nread : n;
+            n += nread;
+            if (n >= len)
+                return n;
+            // if not closed but no bytes available, return
+            InputStream input = in;
+            if (input != null && input.available() <= 0)
+                return n;
+        }
+    }
+    
+    /**
+     * See the general contract of the <code>skip</code> method of
+     * <code>InputStream</code>.
+     * 
+     * @exception IOException
+     *                if the stream does not support seek, or if this input
+     *                stream has been closed by invoking its {@link #close()}
+     *                method, or an I/O error occurs.
+     */
+    public  long skip(long n) throws IOException
+    {
+        getBufIfOpen(); // Check for closed stream
+        if (n <= 0)
+        {
+            return 0;
+        }
+        long avail = count - pos;
+        
+        if (avail <= 0)
+        {
+            // If no mark position set then don't keep in buffer
+            if (markpos < 0)
+                return getInIfOpen().skip(n);
+            
+            // Fill in buffer to save bytes for reset
+            fill();
+            avail = count - pos;
+            if (avail <= 0)
+                return 0;
+        }
+        
+        long skipped = (avail < n) ? avail : n;
+        pos += skipped;
+        return skipped;
+    }
+    
+    /**
+     * Returns an estimate of the number of bytes that can be read (or skipped
+     * over) from this input stream without blocking by the next invocation of a
+     * method for this input stream. The next invocation might be the same
+     * thread or another thread. A single read or skip of this many bytes will
+     * not block, but may read or skip fewer bytes.
+     * <p>
+     * This method returns the sum of the number of bytes remaining to be read
+     * in the buffer (<code>count&nbsp;- pos</code>) and the result of
+     * calling the {@link java.io.FilterInputStream#in in}.available().
+     * 
+     * @return an estimate of the number of bytes that can be read (or skipped
+     *         over) from this input stream without blocking.
+     * @exception IOException
+     *                if this input stream has been closed by invoking its
+     *                {@link #close()} method, or an I/O error occurs.
+     */
+    public  int available() throws IOException
+    {
+        return getInIfOpen().available() + (count - pos);
+    }
+    
+    /**
+     * See the general contract of the <code>mark</code> method of
+     * <code>InputStream</code>.
+     * 
+     * @param readlimit
+     *            the maximum limit of bytes that can be read before the mark
+     *            position becomes invalid.
+     * @see java.io.BufferedInputStream#reset()
+     */
+    public  void mark(int readlimit)
+    {
+        marklimit = readlimit;
+        markpos = pos;
+    }
+    
+    /**
+     * See the general contract of the <code>reset</code> method of
+     * <code>InputStream</code>.
+     * <p>
+     * If <code>markpos</code> is <code>-1</code> (no mark has been set or
+     * the mark has been invalidated), an <code>IOException</code> is thrown.
+     * Otherwise, <code>pos</code> is set equal to <code>markpos</code>.
+     * 
+     * @exception IOException
+     *                if this stream has not been marked or, if the mark has
+     *                been invalidated, or the stream has been closed by
+     *                invoking its {@link #close()} method, or an I/O error
+     *                occurs.
+     * @see java.io.BufferedInputStream#mark(int)
+     */
+    public  void reset() throws IOException
+    {
+        getBufIfOpen(); // Cause exception if closed
+        if (markpos < 0)
+            throw new IOException("Resetting to invalid mark");
+        pos = markpos;
+    }
+    
+    /**
+     * Tests if this input stream supports the <code>mark</code> and
+     * <code>reset</code> methods. The <code>markSupported</code> method of
+     * <code>BufferedInputStream</code> returns <code>true</code>.
+     * 
+     * @return a <code>boolean</code> indicating if this stream type supports
+     *         the <code>mark</code> and <code>reset</code> methods.
+     * @see java.io.InputStream#mark(int)
+     * @see java.io.InputStream#reset()
+     */
+    public boolean markSupported()
+    {
+        return true;
+    }
+    
+    /**
+     * Closes this input stream and releases any system resources associated
+     * with the stream. Once the stream has been closed, further read(),
+     * available(), reset(), or skip() invocations will throw an IOException.
+     * Closing a previously closed stream has no effect.
+     * 
+     * @exception IOException
+     *                if an I/O error occurs.
+     */
+    public void close() throws IOException
+    {
+        byte[] buffer;
+        while ((buffer = buf) != null)
+        {
+            if (bufUpdater.compareAndSet(this, buffer, null))
+            {
+                InputStream input = in;
+                in = null;
+                if (input != null)
+                    input.close();
+                return;
+            }
+            // Else retry in case a new buf was CASed in fill()
+        }
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/io/FastBufferedOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/FastBufferedOutputStream.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/FastBufferedOutputStream.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/FastBufferedOutputStream.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+
+/**
+ * The class implements a buffered output stream. By setting up such an output
+ * stream, an application can write bytes to the underlying output stream
+ * without necessarily causing a call to the underlying system for each byte
+ * written.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class FastBufferedOutputStream extends FilterOutputStream
+{
+    /**
+     * The internal buffer where data is stored.
+     */
+    protected byte buf[];
+    
+    /**
+     * The number of valid bytes in the buffer. This value is always in the
+     * range <tt>0</tt> through <tt>buf.length</tt>; elements
+     * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid byte
+     * data.
+     */
+    protected int count;
+    
+    /**
+     * Creates a new buffered output stream to write data to the specified
+     * underlying output stream.
+     * 
+     * @param out
+     *            the underlying output stream.
+     */
+    public FastBufferedOutputStream(OutputStream out)
+    {
+        this(out, 8192);
+    }
+    
+    /**
+     * Creates a new buffered output stream to write data to the specified
+     * underlying output stream with the specified buffer size.
+     * 
+     * @param out
+     *            the underlying output stream.
+     * @param size
+     *            the buffer size.
+     * @exception IllegalArgumentException
+     *                if size &lt;= 0.
+     */
+    public FastBufferedOutputStream(OutputStream out, int size)
+    {
+        super(out);
+        if (size <= 0)
+        {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        buf = new byte[size];
+    }
+    
+    /** Flush the internal buffer */
+    private void flushBuffer() throws IOException
+    {
+        if (count > 0)
+        {
+            out.write(buf, 0, count);
+            count = 0;
+        }
+    }
+    
+    /**
+     * Writes the specified byte to this buffered output stream.
+     * 
+     * @param b
+     *            the byte to be written.
+     * @exception IOException
+     *                if an I/O error occurs.
+     */
+    public void write(int b) throws IOException
+    {
+        if (count >= buf.length)
+        {
+            flushBuffer();
+        }
+        buf[count++] = (byte) b;
+    }
+    
+    /**
+     * Writes <code>len</code> bytes from the specified byte array starting at
+     * offset <code>off</code> to this buffered output stream.
+     * 
+     * <p>
+     * Ordinarily this method stores bytes from the given array into this
+     * stream's buffer, flushing the buffer to the underlying output stream as
+     * needed. If the requested length is at least as large as this stream's
+     * buffer, however, then this method will flush the buffer and write the
+     * bytes directly to the underlying output stream. Thus redundant
+     * <code>BufferedOutputStream</code>s will not copy data unnecessarily.
+     * 
+     * @param b
+     *            the data.
+     * @param off
+     *            the start offset in the data.
+     * @param len
+     *            the number of bytes to write.
+     * @exception IOException
+     *                if an I/O error occurs.
+     */
+    public void write(byte b[], int off, int len)
+    throws IOException
+    {
+        if (len >= buf.length)
+        {
+            /*
+             * If the request length exceeds the size of the output buffer,
+             * flush the output buffer and then write the data directly. In this
+             * way buffered streams will cascade harmlessly.
+             */
+            flushBuffer();
+            out.write(b, off, len);
+            return;
+        }
+        if (len > buf.length - count)
+        {
+            flushBuffer();
+        }
+        System.arraycopy(b, off, buf, count, len);
+        count += len;
+    }
+    
+    /**
+     * Flushes this buffered output stream. This forces any buffered output
+     * bytes to be written out to the underlying output stream.
+     * 
+     * @exception IOException
+     *                if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    public void flush() throws IOException
+    {
+        flushBuffer();
+        out.flush();
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/io/ICompactSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/ICompactSerializer.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/ICompactSerializer.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/ICompactSerializer.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.DataInputStream;
+
+/**
+ * Allows for the controlled serialization/deserialization of a given type.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface ICompactSerializer<T>
+{
+	/**
+     * Serialize the specified type into the specified DataOutputStream instance.
+     * @param t type that needs to be serialized
+     * @param dos DataOutput into which serialization needs to happen.
+     * @throws IOException
+     */
+    public void serialize(T t, DataOutputStream dos) throws IOException;
+
+    /**
+     * Deserialize into the specified DataInputStream instance.
+     * @param dis DataInput from which deserialization needs to happen.
+     * @throws IOException
+     * @return the type that was deserialized
+     */
+    public T deserialize(DataInputStream dis) throws IOException;
+}

Added: incubator/cassandra/src/org/apache/cassandra/io/IFileReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/IFileReader.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/IFileReader.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/IFileReader.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.List;
+
+/**
+ * Interface to read from the SequenceFile abstraction.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFileReader
+{
+    public String getFileName();
+    public long getEOF() throws IOException;
+    public long getCurrentPosition() throws IOException;
+    public boolean isHealthyFileDescriptor() throws IOException;
+    public void seek(long position) throws IOException;
+    public boolean isEOF() throws IOException;
+
+    /**
+     * Be extremely careful while using this API. This currently
+     * used to read the commit log header from the commit logs.
+     * Treat this as an internal API.
+     * 
+     * @param bytes read into this byte array.
+    */
+    public void readDirect(byte[] bytes) throws IOException;
+    
+    /**
+     * Read a long value from the underlying sub system.
+     * @return value read
+     * @throws IOException
+     */
+    public long readLong() throws IOException;
+    
+    /**
+     * This functions is used to help out subsequent reads
+     * on the specified key. It reads the keys prior to this
+     * one on disk so that the buffer cache is hot.
+     * 
+     *  @param key key for which we are performing the touch.
+     *  @param fData true implies we fetch the data into buffer cache.
+    */
+    public long touch(String key , boolean fData) throws IOException;
+    
+    /**
+     * This method helps is retrieving the offset of the specified
+     * key in the file using the block index.
+     * 
+     * @param key key whose position we need in the block index.
+    */
+    public long getPositionFromBlockIndex(String key) throws IOException;
+    
+    /**
+     * This method returns the position of the specified key and the 
+     * size of its data segment from the block index.
+     * 
+     * @param key key whose block metadata we are interested in.
+     * @return an instance of the block metadata for this key.
+    */
+    public SSTable.BlockMetadata getBlockMetadata(String key) throws IOException;
+
+    /**
+     * This method dumps the next key/value into the DataOuputStream
+     * passed in.
+     *
+     * @param dos - DataOutputStream that needs to be filled.
+     * @return number of bytes read.
+     * @throws IOException 
+    */
+    public long next(DataOutputBuffer bufOut) throws IOException;
+
+    /**
+     * This method dumps the next key/value into the DataOuputStream
+     * passed in.
+     *
+     * @param key key we are interested in.
+     * @param dos DataOutputStream that needs to be filled.
+     * @param section region of the file that needs to be read
+     * @throws IOException
+     * @return the number of bytes read.
+    */
+    public long next(String key, DataOutputBuffer bufOut, Coordinate section) throws IOException;
+
+    /**
+     * This method dumps the next key/value into the DataOuputStream
+     * passed in.
+     *
+     * @param key key we are interested in.
+     * @param dos DataOutputStream that needs to be filled.
+     * @param column name of the column in our format.
+     * @param section region of the file that needs to be read
+     * @throws IOException
+     * @return number of bytes that were read.
+    */
+    public long next(String key, DataOutputBuffer bufOut, String column, Coordinate section) throws IOException;
+
+    /**
+     * This method dumps the next key/value into the DataOuputStream
+     * passed in. Always use this method to query for application
+     * specific data as it will have indexes.
+     *
+     * @param key - key we are interested in.
+     * @param dos - DataOutputStream that needs to be filled.
+     * @param cfName - The name of the column family only without the ":"
+     * @param columnNames - The list of columns in the cfName column family
+     * 					     that we want to return
+     * @param section region of the file that needs to be read
+     * @throws IOException
+     * @return number of bytes read.
+     *
+    */
+    public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, List<String> columnNames, Coordinate section) throws IOException;
+    
+    /**
+     * This method dumps the next key/value into the DataOuputStream
+     * passed in.
+     *
+     * @param key key we are interested in.
+     * @param dos DataOutputStream that needs to be filled.
+     * @param column name of the column in our format.
+     * @param timeRange time range we are interested in.
+     * @param section region of the file that needs to be read
+     * @throws IOException
+     * @return number of bytes that were read.
+    */
+    public long next(String key, DataOutputBuffer bufOut, String column, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException;
+
+    /**
+     * Close the file after reading.
+     * @throws IOException
+     */
+    public void close() throws IOException;
+}

Added: incubator/cassandra/src/org/apache/cassandra/io/IFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/IFileWriter.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/IFileWriter.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/IFileWriter.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.cassandra.db.PrimaryKey;
+
+
+/**
+ * An interface for writing into the SequenceFile abstraction.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFileWriter
+{
+    /**
+     * Get the current position of the file pointer.
+     * @return current file pointer position
+     * @throws IOException
+     */
+    public long getCurrentPosition() throws IOException;
+    
+    /**
+     * @return the last file modification time.
+     */
+    public long lastModified();
+    
+    /**
+     * Seeks the file pointer to the specified position.
+     * @param position position within the file to seek to.
+     * @throws IOException
+     */
+    public void seek(long position) throws IOException;
+    
+    /**
+     * Appends the buffer to the the underlying SequenceFile.
+     * @param buffer buffer which contains the serialized data.
+     * @throws IOException
+     */
+    public void append(DataOutputBuffer buffer) throws IOException;
+    
+    /**
+     * Appends the key and the value to the the underlying SequenceFile.
+     * @param keyBuffer buffer which contains the serialized key.
+     * @param buffer buffer which contains the serialized data.
+     * @throws IOException
+     */
+    public void append(DataOutputBuffer keyBuffer, DataOutputBuffer buffer) throws IOException;
+    
+    /**
+     * Appends the key and the value to the the underlying SequenceFile.
+     * @param key key associated with this peice of data.
+     * @param buffer buffer containing the serialized data.
+     * @throws IOException
+     */
+    public void append(String key, DataOutputBuffer buffer) throws IOException;
+    
+    /**
+     * Appends the key and the value to the the underlying SequenceFile.
+     * @param key key associated with this peice of data.
+     * @param value byte array containing the serialized data.
+     * @throws IOException
+     */
+    public void append(String key, byte[] value) throws IOException;
+    
+    /**
+     * Appends the key and the long value to the the underlying SequenceFile.
+     * This is used in the contruction of the index file associated with a 
+     * SSTable.
+     * @param key key associated with this peice of data.
+     * @param value value associated with this key.
+     * @throws IOException
+     */
+    public void append(String key, long value) throws IOException;
+    
+    /**
+     * Be extremely careful while using this API. This currently
+     * used to write the commit log header in the commit logs.
+     * If not used carefully it could completely screw up reads
+     * of other key/value pairs that are written. 
+     * 
+     * @param bytes serialized version of the commit log header.
+     * @throws IOException
+    */
+    public long writeDirect(byte[] bytes) throws IOException;
+    
+    /**
+     * Write a long into the underlying sub system.
+     * @param value long to be written
+     * @throws IOException
+     */
+    public void writeLong(long value) throws IOException;
+      
+    /**
+     * Close the file which is being used for the write.
+     * @throws IOException
+     */
+    public void close() throws IOException;  
+    
+    /**
+     * Close the file after appending the passed in footer information.
+     * @param footer footer information.
+     * @param size size of the footer.
+     * @throws IOException
+     */
+    public void close(byte[] footer, int size) throws IOException;
+    
+    /**
+     * @return the name of the file.
+     */
+    public String getFileName();    
+    
+    /**
+     * @return the size of the file.
+     * @throws IOException
+     */
+    public long getFileSize() throws IOException;    
+}

Added: incubator/cassandra/src/org/apache/cassandra/io/IndexHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/IndexHelper.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/IndexHelper.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/IndexHelper.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,595 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.TypeInfo;
+import org.apache.cassandra.io.SSTable.KeyPositionInfo;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
+
+
+/**
+ * Provides helper to serialize, deserialize and use column indexes.
+ * Author : Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+
+public class IndexHelper
+{
+	/**
+	 * Serializes a column index to a data output stream
+	 * @param indexSizeInBytes Size of index to be written
+	 * @param columnIndexList List of column index entries as objects
+	 * @param dos the output stream into which the column index is to be written
+	 * @throws IOException
+	 */
+	public static void serialize(int indexSizeInBytes, List<ColumnIndexInfo> columnIndexList, DataOutputStream dos) throws IOException
+	{
+		/* if we have no data to index, the write that there is no index present */
+		if(indexSizeInBytes == 0 || columnIndexList == null || columnIndexList.size() == 0)
+		{
+			dos.writeBoolean(false);
+		}
+		else
+		{
+	        /* write if we are storing a column index */
+	    	dos.writeBoolean(true);
+	    	/* write the size of the index */
+	    	dos.writeInt(indexSizeInBytes);
+	        for( ColumnIndexInfo cIndexInfo : columnIndexList )
+	        {
+	        	cIndexInfo.serialize(dos);
+	        }
+		}
+	}
+    
+    /**
+     * Skip the bloom filter and the index and return the bytes read.
+     * @param in the data input from which the bloom filter and index 
+     *           should be skipped
+     * @return number of bytes read.
+     * @throws IOException
+     */
+    public static int skipBloomFilterAndIndex(DataInput in) throws IOException
+    {
+        int totalBytesRead = 0;
+        /* size of the bloom filter */
+        int size = in.readInt();
+        totalBytesRead += 4;
+        /* skip the serialized bloom filter */
+        in.skipBytes(size);
+        totalBytesRead += size;
+        /* skip the index on disk */
+        /* read if the file has column indexes */
+        boolean hasColumnIndexes = in.readBoolean();
+        totalBytesRead += 1;
+        if ( hasColumnIndexes )
+        {
+            totalBytesRead += skipIndex(in);
+        }
+        return totalBytesRead;
+    }
+    
+    /**
+     * Skip the bloom filter and return the bytes read.
+     * @param in the data input from which the bloom filter 
+     *           should be skipped
+     * @return number of bytes read.
+     * @throws IOException
+     */
+    public static int skipBloomFilter(DataInput in) throws IOException
+    {
+        int totalBytesRead = 0;
+        /* size of the bloom filter */
+        int size = in.readInt();
+        totalBytesRead += 4;
+        /* skip the serialized bloom filter */
+        in.skipBytes(size);
+        totalBytesRead += size;
+        return totalBytesRead;
+    }
+
+	/**
+	 * Skip the index and return the number of bytes read.
+	 * @param file the data input from which the index should be skipped
+	 * @return number of bytes read from the data input
+	 * @throws IOException
+	 */
+	public static int skipIndex(DataInput file) throws IOException
+	{
+        /* read only the column index list */
+        int columnIndexSize = file.readInt();
+        int totalBytesRead = 4;
+
+        /* skip the column index data */
+        file.skipBytes(columnIndexSize);
+        totalBytesRead += columnIndexSize;
+
+        return totalBytesRead;
+	}
+    
+    /**
+     * Deserialize the index into a structure and return the number of bytes read.
+     * @param in Input from which the serialized form of the index is read
+     * @param columnIndexList the structure which is filled in with the deserialized index
+     * @return number of bytes read from the input
+     * @throws IOException
+     */
+	static int deserializeIndex(String cfName, DataInput in, List<ColumnIndexInfo> columnIndexList) throws IOException
+	{
+		/* read only the column index list */
+		int columnIndexSize = in.readInt();
+		int totalBytesRead = 4;
+
+		/* read the indexes into a separate buffer */
+		DataOutputBuffer indexOut = new DataOutputBuffer();
+        /* write the data into buffer */
+		indexOut.write(in, columnIndexSize);
+		totalBytesRead += columnIndexSize;
+
+		/* now deserialize the index list */
+        DataInputBuffer indexIn = new DataInputBuffer();
+        indexIn.reset(indexOut.getData(), indexOut.getLength());
+        String columnName;
+        int position;
+        int numCols;
+        
+        TypeInfo typeInfo = DatabaseDescriptor.getTypeInfo(cfName);
+        if ( DatabaseDescriptor.getColumnFamilyType(cfName).equals("Super") || DatabaseDescriptor.isNameSortingEnabled(cfName) )
+        {
+            typeInfo = TypeInfo.STRING;
+        }
+        
+        while(indexIn.available() > 0)
+        {            
+            ColumnIndexInfo cIndexInfo = ColumnIndexFactory.instance(typeInfo);
+        	cIndexInfo = cIndexInfo.deserialize(indexIn);
+        	columnIndexList.add(cIndexInfo);
+        }
+
+		return totalBytesRead;
+	}
+
+    /**
+     * Returns the range in which a given column falls in the index
+     * @param column The column whose range needs to be found
+     * @param columnIndexList the in-memory representation of the column index
+     * @param dataSize the total size of the data
+     * @param totalNumCols total number of columns
+     * @return an object describing a subrange in which the column is serialized
+     */
+	static ColumnRange getColumnRangeFromNameIndex(IndexHelper.ColumnIndexInfo cIndexInfo, List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int totalNumCols)
+	{
+		/* find the offset for the column */
+        int size = columnIndexList.size();
+        long start = 0;
+        long end = dataSize;
+        int numColumns = 0;      
+       
+        int index = Collections.binarySearch(columnIndexList, cIndexInfo);
+        if ( index < 0 )
+        {
+            /* We are here which means that the requested column is not an index. */
+            index = (++index)*(-1);
+        }
+        else
+        {
+        	++index;
+        }
+
+        /* calculate the starting offset from which we have to read */
+        start = (index == 0) ? 0 : columnIndexList.get(index - 1).position();
+
+        if( index < size )
+        {
+        	end = columnIndexList.get(index).position();
+            numColumns = columnIndexList.get(index).count();            
+        }
+        else
+        {
+        	end = dataSize;  
+            int totalColsIndexed = 0;
+            for( IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList )
+            {
+                totalColsIndexed += colPosInfo.count();
+            }
+            numColumns = totalNumCols - totalColsIndexed;
+        }
+
+        return new ColumnRange(start, end, numColumns);
+	}
+
+	/**
+	 * Returns the sub-ranges that contain the list of columns in columnNames.
+	 * @param columnNames The list of columns whose subranges need to be found
+	 * @param columnIndexList the deserialized column indexes
+	 * @param dataSize the total size of data
+	 * @param totalNumCols the total number of columns
+	 * @return a list of subranges which contain all the columns in columnNames
+	 */
+	static List<ColumnRange> getMultiColumnRangesFromNameIndex(List<String> columnNames, List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int totalNumCols)
+	{
+		List<ColumnRange> columnRanges = new ArrayList<ColumnRange>();				
+
+        if ( columnIndexList.size() == 0 )
+        {
+            columnRanges.add( new ColumnRange(0, dataSize, totalNumCols) );
+        }
+        else
+        {
+            Map<Long, Boolean> offset = new HashMap<Long, Boolean>();
+    		for(String column : columnNames)
+    		{
+                IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnNameIndexInfo(column);
+    			ColumnRange columnRange = getColumnRangeFromNameIndex(cIndexInfo, columnIndexList, dataSize, totalNumCols);   
+                if ( offset.get( columnRange.coordinate().start_ ) == null ) 
+                {
+                    columnRanges.add(columnRange);
+                    offset.put(columnRange.coordinate().start_, true);
+                }
+    		}
+        }
+
+		return columnRanges;
+	}
+    
+    /**
+     * Returns the range in which a given column falls in the index. This
+     * is used when time range queries are in play. For instance if we are
+     * looking for columns in the range [t, t2]
+     * @param cIndexInfo the time we are interested in.
+     * @param columnIndexList the in-memory representation of the column index
+     * @param dataSize the total size of the data
+     * @param totalNumCols total number of columns
+     * @return an object describing a subrange in which the column is serialized
+     */
+    static ColumnRange getColumnRangeFromTimeIndex(IndexHelper.TimeRange timeRange, List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int totalNumCols)
+    {
+        /* if column indexes were not present for this column family, the handle accordingly */
+        if(columnIndexList.size() == 0)
+        {
+            return new ColumnRange(0, dataSize, totalNumCols);
+        }
+
+        /* find the offset for the column */
+        int size = columnIndexList.size();
+        long start = 0;
+        long end = dataSize;
+        int numColumns = 0;      
+       
+        /*
+         *  Time indicies are sorted in descending order. So
+         *  we need to apply a reverse compartor for the 
+         *  binary search.        
+        */        
+        Comparator<IndexHelper.ColumnIndexInfo> comparator = Collections.reverseOrder(); 
+        IndexHelper.ColumnIndexInfo rhs = IndexHelper.ColumnIndexFactory.instance(TypeInfo.LONG);
+        rhs.set(timeRange.rhs());
+        int index = Collections.binarySearch(columnIndexList, rhs, comparator);
+        if ( index < 0 )
+        {
+            /* We are here which means that the requested column is not an index. */
+            index = (++index)*(-1);
+        }
+        else
+        {
+            ++index;
+        }
+
+        /* 
+         * Calculate the starting offset from which we have to read. So
+         * we achieve this by performing the probe using the bigger timestamp
+         * and then scanning the column position chunks till we reach the
+         * lower timestamp in the time range.      
+        */
+        start = (index == 0) ? 0 : columnIndexList.get(index - 1).position();
+        /* add the number of colunms in the first chunk. */
+        numColumns += (index ==0) ? columnIndexList.get(0).count() : columnIndexList.get(index - 1).count(); 
+        if( index < size )
+        {            
+            int chunks = columnIndexList.size();
+            /* Index info for the lower bound of the time range */
+            IndexHelper.ColumnIndexInfo lhs = IndexHelper.ColumnIndexFactory.instance(TypeInfo.LONG);
+            lhs.set(timeRange.lhs());
+            int i = index + 1;
+            for ( ; i < chunks; ++i )
+            {
+                IndexHelper.ColumnIndexInfo cIndexInfo2 = columnIndexList.get(i);
+                if ( cIndexInfo2.compareTo(lhs) < 0 )
+                {
+                    numColumns += cIndexInfo2.count();
+                    break;
+                } 
+                numColumns += cIndexInfo2.count();
+            }
+            
+            end = columnIndexList.get(i).position();                       
+        }
+        else
+        {
+            end = dataSize;  
+            int totalColsIndexed = 0;
+            for( IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList )
+            {
+                totalColsIndexed += colPosInfo.count();
+            }
+            numColumns = totalNumCols - totalColsIndexed;
+        }
+       
+        return new ColumnRange(start, end, numColumns);
+    }    
+    
+    public static class ColumnIndexFactory
+    {
+        public static ColumnIndexInfo instance(TypeInfo typeInfo)
+        {
+            ColumnIndexInfo cIndexInfo = null;
+            switch(typeInfo)
+            {
+                case STRING:
+                    cIndexInfo = new ColumnNameIndexInfo();
+                    break;
+                    
+                case LONG:
+                    cIndexInfo = new ColumnTimestampIndexInfo();
+                    break;
+            }
+            return cIndexInfo;
+        }    
+    }
+    
+    /**
+     * Encapsulates a time range. Queries use 
+     * this abstraction for indicating start 
+     * and end regions of a time filter.
+     * 
+     * @author alakshman
+     *
+     */
+    public static class TimeRange
+    {
+        private long lhs_;
+        private long rhs_;
+        
+        public TimeRange(long lhs, long rhs)
+        {
+            lhs_ = lhs;
+            rhs_ = rhs;
+        }
+        
+        public long lhs()
+        {
+            return lhs_;
+        }
+        
+        public long rhs()
+        {
+            return rhs_;
+        }
+    }
+    
+    /**
+     * A column range containing the start and end
+     * offset of the appropriate column index chunk
+     * and the number of columns in that chunk.
+     * @author alakshman
+     *
+     */
+    public static class ColumnRange
+    {
+        private Coordinate coordinate_;
+        private int columnCount_;
+        
+        ColumnRange(long start, long end, int columnCount)
+        {
+            coordinate_ = new Coordinate(start, end);
+            columnCount_ = columnCount;
+        }
+        
+        Coordinate coordinate()
+        {
+            return coordinate_;
+        }
+        
+        int count()
+        {
+            return columnCount_;
+        }                
+    }
+
+	/**
+	 * A helper class to generate indexes while
+     * the columns are sorted by name on disk.
+	*/
+    public static abstract class ColumnIndexInfo implements Comparable<ColumnIndexInfo>
+    {
+        private long position_;
+        private int columnCount_;        
+        
+        ColumnIndexInfo(long position, int columnCount)
+        {
+            position_ = position;
+            columnCount_ = columnCount;
+        }
+                
+        public long position()
+        {
+            return position_;
+        }
+        
+        public void position(long position)
+        {
+            position_ = position;
+        }
+        
+        int count()
+        {
+            return columnCount_;
+        }
+        
+        public void count(int count)
+        {
+            columnCount_ = count;
+        }
+                
+        public abstract void set(Object o);
+        public abstract void serialize(DataOutputStream dos) throws IOException;
+        public abstract ColumnIndexInfo deserialize(DataInputStream dis) throws IOException;
+        
+        public int size()
+        {
+            /* size of long for "position_"  + size of columnCount_ */
+            return (8 + 4);
+        }
+    }
+
+    static class ColumnNameIndexInfo extends ColumnIndexInfo
+    {
+        private String name_;       
+        
+        ColumnNameIndexInfo()
+        {
+            super(0L, 0);
+        }
+        
+        ColumnNameIndexInfo(String name)
+        {
+            this(name, 0L, 0);
+        }
+                
+        ColumnNameIndexInfo(String name, long position, int columnCount)
+        {
+            super(position, columnCount);
+            name_ = name;
+        }
+        
+        String name()
+        {
+            return name_;
+        }                
+        
+        public void set(Object o)
+        {
+            name_ = (String)o;
+        }
+        
+        public int compareTo(ColumnIndexInfo rhs)
+        {
+            IndexHelper.ColumnNameIndexInfo cIndexInfo = (IndexHelper.ColumnNameIndexInfo)rhs;
+            return name_.compareTo(cIndexInfo.name_);
+        }
+        
+        public void serialize(DataOutputStream dos) throws IOException
+        {
+            dos.writeLong(position()); 
+            dos.writeInt(count());
+            dos.writeUTF(name_);        
+        }
+        
+        public ColumnNameIndexInfo deserialize(DataInputStream dis) throws IOException
+        {
+            long position = dis.readLong();
+            int columnCount = dis.readInt();            
+            String name = dis.readUTF();       
+            return new ColumnNameIndexInfo(name, position, columnCount);
+        }
+        
+        public int size()
+        {
+            int size = super.size();
+            /* Size of the name_ as an UTF8 and the actual length as a short for the readUTF. */
+            size += FBUtilities.getUTF8Length(name_) + IColumn.UtfPrefix_;
+            return size;
+        }
+    }
+
+    static class ColumnTimestampIndexInfo extends ColumnIndexInfo
+    {
+        private long timestamp_;
+        
+        ColumnTimestampIndexInfo()
+        {
+            super(0L, 0);
+        }
+        
+        ColumnTimestampIndexInfo(long timestamp)
+        {
+            this(timestamp, 0L, 0);  
+        }
+        
+        ColumnTimestampIndexInfo(long timestamp, long position, int columnCount)
+        {
+            super(position, columnCount);
+            timestamp_ = timestamp;
+        }
+        
+        public long timestamp()
+        {
+            return timestamp_;
+        }
+        
+        public void set(Object o)
+        {
+            timestamp_ = (Long)o;
+        }
+        
+        public int compareTo(ColumnIndexInfo rhs)
+        {
+            ColumnTimestampIndexInfo cIndexInfo = (ColumnTimestampIndexInfo)rhs;
+            return Long.valueOf(timestamp_).compareTo(Long.valueOf(cIndexInfo.timestamp_));
+        }
+        
+        public void serialize(DataOutputStream dos) throws IOException
+        {
+            dos.writeLong(position()); 
+            dos.writeInt(count());
+            dos.writeLong(timestamp_);        
+        }
+        
+        public ColumnTimestampIndexInfo deserialize(DataInputStream dis) throws IOException
+        {
+            long position = dis.readLong();
+            int columnCount = dis.readInt();
+            long timestamp = dis.readLong();        
+            return new ColumnTimestampIndexInfo(timestamp, position, columnCount);
+        }
+        
+        public int size()
+        {
+            int size = super.size();
+            /* add the size of the timestamp which is a long */ 
+            size += 8;
+            return size;
+        }
+    }
+}