You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC

svn commit: r749218 [19/34] - in /incubator/cassandra: branches/ dist/ nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/ trunk/src/org/apache/ trunk/src/org/apache/cassandra/ trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...

Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,456 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+
+/**
+ * A <code>ChecksumRandomAccessFile</code> is like a
+ * <code>RandomAccessFile</code>, but it uses a private buffer so that most
+ * operations do not require a disk access.
+ * <P>
+ * 
+ * Note: The operations on this class are unmonitored. Also, the correct
+ * functioning of the <code>RandomAccessFile</code> methods that are not
+ * overridden here relies on the implementation of those methods in the
+ * superclass.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class ChecksumRandomAccessFile extends RandomAccessFile
+{ 
+    private static enum ChecksumOperations
+    {
+        LOG,
+        VERIFY
+    }
+    
+    static final int LogBuffSz_ = 16; // 64K buffer
+    private static final int checksumSz_ = (1 << LogBuffSz_); // 64K
+    public static final int BuffSz_ = (1 << LogBuffSz_);
+    static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
+    
+    /*
+     * This implementation is based on the buffer implementation in Modula-3's
+     * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
+     */
+    private boolean dirty_; // true iff unflushed bytes exist
+    private boolean closed_; // true iff the file is closed
+    private long curr_; // current position in file
+    private long lo_, hi_; // bounds on characters in "buff"
+    private byte[] buff_; // local buffer
+    private long maxHi_; // this.lo + this.buff.length
+    private boolean hitEOF_; // buffer contains last file block?
+    private long diskPos_; // disk position
+    private String filename_; // file name
+    
+    /*
+     * To describe the above fields, we introduce the following abstractions for
+     * the file "f":
+     * 
+     * len(f) the length of the file curr(f) the current position in the file
+     * c(f) the abstract contents of the file disk(f) the contents of f's
+     * backing disk file closed(f) true iff the file is closed
+     * 
+     * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
+     * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
+     * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
+     * operation has the effect of making "disk(f)" identical to "c(f)".
+     * 
+     * A file is said to be *valid* if the following conditions hold:
+     * 
+     * V1. The "closed" and "curr" fields are correct:
+     * 
+     * f.closed == closed(f) f.curr == curr(f)
+     * 
+     * V2. The current position is either contained in the buffer, or just past
+     * the buffer:
+     * 
+     * f.lo <= f.curr <= f.hi
+     * 
+     * V3. Any (possibly) unflushed characters are stored in "f.buff":
+     * 
+     * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
+     * 
+     * V4. For all characters not covered by V3, c(f) and disk(f) agree:
+     * 
+     * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
+     * disk(f)[i])
+     * 
+     * V5. "f.dirty" is true iff the buffer contains bytes that should be
+     * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
+     * 
+     * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
+     * 
+     * V6. this.maxHi == this.lo + this.buff.length
+     * 
+     * Note that "f.buff" can be "null" in a valid file, since the range of
+     * characters in V3 is empty when "f.lo == f.curr".
+     * 
+     * A file is said to be *ready* if the buffer contains the current position,
+     * i.e., when:
+     * 
+     * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
+     * 
+     * When a file is ready, reading or writing a single byte can be performed
+     * by reading or writing the in-memory buffer without performing a disk
+     * operation.
+     */
+    
+    /**
+     * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code>
+     * in mode <code>mode</code>, which should be "r" for reading only, or
+     * "rw" for reading and writing.
+     */
+    public ChecksumRandomAccessFile(File file, String mode) throws IOException
+    {
+        super(file, mode);
+        this.init(file.getAbsolutePath(), 0);
+    }
+    
+    public ChecksumRandomAccessFile(File file, String mode, int size) throws IOException
+    {
+        super(file, mode);
+        this.init(file.getAbsolutePath(), size);
+    }
+    
+    /**
+     * Open a new <code>BufferedRandomAccessFile</code> on the file named
+     * <code>name</code> in mode <code>mode</code>, which should be "r" for
+     * reading only, or "rw" for reading and writing.
+     */
+    public ChecksumRandomAccessFile(String name, String mode) throws IOException
+    {
+        super(name, mode);
+        this.init(name, 0);
+    }
+    
+    public ChecksumRandomAccessFile(String name, String mode, int size) throws FileNotFoundException
+    {
+        super(name, mode);
+        this.init(name, size);
+    }
+    
+    private void init(String name, int size)
+    {
+        this.dirty_ = this.closed_ = false;
+        this.lo_ = this.curr_ = this.hi_ = 0;
+        this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
+        this.maxHi_ = (long) BuffSz_;
+        this.hitEOF_ = false;
+        this.diskPos_ = 0L;
+        this.filename_ = name;
+    }
+    
+    public void close() throws IOException
+    {
+        this.flush();
+        this.closed_ = true;
+        super.close();
+    }
+    
+    /**
+     * Flush any bytes in the file's buffer that have not yet been written to
+     * disk. If the file was created read-only, this method is a no-op.
+     */
+    public void flush() throws IOException
+    {
+        this.flushBuffer();
+    }
+    
+    private void doChecksumOperation(ChecksumOperations chksumOps) throws IOException
+    {        
+        int buffSz = buff_.length;
+        /*
+         * If the diskPos_ is at the buffer boundary then return 
+         * diskPos_ - 1 else return the actual diskPos_.
+        */        
+        long currentPosition = ( (diskPos_ % buffSz) == 0 ) ? diskPos_ - 1 : diskPos_;
+        /* Tells me which buffered chunk I am in. */
+        long chunk = (currentPosition / buffSz) + 1; 
+        /* Number of checksum chunks within a buffer */
+        int chksumChunks = buff_.length / checksumSz_;  
+        /* Position of the start of the previous buffer boundary */
+        long pos = (chunk == 0) ? 0 : (chunk - 1)*buffSz;
+        int startOffset = 0;
+        int chksumChunkId = (int)(chksumChunks*(chunk - 1) + 1);
+        do
+        {            
+            int fId = SequenceFile.getFileId(filename_);               
+            switch( chksumOps )
+            {
+                case LOG:                    
+                    ChecksumManager.instance(filename_).logChecksum(fId, chksumChunkId++, buff_, startOffset, checksumSz_);
+                    break;
+                case VERIFY:
+                    ChecksumManager.instance(filename_).validateChecksum(filename_, chksumChunkId++, buff_, startOffset, checksumSz_);
+                    break;
+            }
+            pos += checksumSz_;
+            startOffset += checksumSz_;
+        }
+        while ( pos < currentPosition );
+    }
+    
+    /**
+     * Flush any dirty bytes in the buffer to disk. 
+    */
+    private void flushBuffer() throws IOException
+    {
+        if (this.dirty_)
+        {
+            if (this.diskPos_ != this.lo_)
+                super.seek(this.lo_);
+            int len = (int) (this.curr_ - this.lo_);            
+            super.write(this.buff_, 0, len);
+            this.diskPos_ = this.curr_;
+            this.dirty_ = false;
+            /* checksum the data before we write to disk */
+            doChecksumOperation(ChecksumOperations.LOG);
+        }
+    }
+    
+    /**
+     * Read at most "this.buff.length" bytes into "this.buff", returning the
+     * number of bytes read. If the return result is less than
+     * "this.buff.length", then EOF was read.
+     */
+    private int fillBuffer() throws IOException
+    {
+        int cnt = 0;
+        int rem = this.buff_.length;
+        int n = -1;
+        while (rem > 0)
+        {
+            n = super.read(this.buff_, cnt, rem);
+            if (n < 0)
+                break;
+            cnt += n;
+            rem -= n;
+        }
+        if (this.hitEOF_ = (cnt < this.buff_.length))
+        {
+            // make sure buffer that wasn't read is initialized with -1
+            Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);             
+        }                
+        this.diskPos_ += cnt;
+        return cnt;
+    }
+    
+    /**
+     * This method positions <code>this.curr</code> at position <code>pos</code>.
+     * If <code>pos</code> does not fall in the current buffer, it flushes the
+     * current buffer and loads the correct one.<p>
+     * 
+     * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code>
+     * is at or past the end-of-file, which can only happen if the file was
+     * opened in read-only mode.
+     */
+    public void seek(long pos) throws IOException
+    {
+        if (pos >= this.hi_ || pos < this.lo_)
+        {
+            // seeking outside of current buffer -- flush and read
+            this.flushBuffer();
+            this.lo_ = pos & BuffMask_; // start at BuffSz boundary
+            this.maxHi_ = this.lo_ + (long) this.buff_.length;
+            if (this.diskPos_ != this.lo_)
+            {
+                super.seek(this.lo_);
+                this.diskPos_ = this.lo_;
+            }
+            int n = this.fillBuffer();
+            if ( n > 0 )
+            {
+                doChecksumOperation(ChecksumOperations.VERIFY);
+            }
+            this.hi_ = this.lo_ + (long) n;
+        }
+        else
+        {
+            // seeking inside current buffer -- no read required
+            if (pos < this.curr_)
+            {
+                // if seeking backwards, we must flush to maintain V4
+                this.flushBuffer();
+            }
+        }
+        this.curr_ = pos;
+    }
+    
+    public long getFilePointer()
+    {
+        return this.curr_;
+    }
+    
+    public long length() throws IOException
+    {
+        return Math.max(this.curr_, super.length());
+    }
+    
+    public int read() throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+            
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        byte res = this.buff_[(int) (this.curr_ - this.lo_)];
+        this.curr_++;
+        return ((int) res) & 0xFF; // convert byte -> int
+    }
+    
+    public int read(byte[] b) throws IOException
+    {
+        return this.read(b, 0, b.length);
+    }
+    
+    public int read(byte[] b, int off, int len) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+            
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(this.buff_, buffOff, b, off, len);
+        this.curr_ += len;
+        return len;
+    }
+    
+    public void write(int b) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_)
+            {
+                // at EOF -- bump "hi"
+                this.hi_++;
+            }
+            else
+            {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_)
+                {
+                    // appending to EOF -- bump "hi"
+                    this.hi_++;
+                }
+            }
+        }
+        this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
+        this.curr_++;
+        this.dirty_ = true;
+    }
+    
+    public void write(byte[] b) throws IOException
+    {
+        this.write(b, 0, b.length);
+    }
+    
+    public void write(byte[] b, int off, int len) throws IOException
+    {
+        while (len > 0)
+        {
+            int n = this.writeAtMost(b, off, len);
+            off += n;
+            len -= n;
+        }
+        this.dirty_ = true;
+    }
+    
+    /*
+     * Write at most "len" bytes to "b" starting at position "off", and return
+     * the number of bytes written.
+     */
+    private int writeAtMost(byte[] b, int off, int len) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_)
+            {
+                // at EOF -- bump "hi"
+                this.hi_ = this.maxHi_;
+            }
+            else
+            {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_)
+                {
+                    // appending to EOF -- bump "hi"
+                    this.hi_ = this.maxHi_;
+                }
+            }
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(b, off, this.buff_, buffOff, len);
+        this.curr_ += len;
+        return len;
+    }
+    
+    public static void main(String[] args) throws Throwable
+    {
+        
+        RandomAccessFile raf = new ChecksumRandomAccessFile("C:\\Engagements\\Table-ColumnFamily-1-Data.dat", "rw");
+        byte[] bytes = new byte[32*1024];
+        Random random = new Random();
+        
+        for ( int i = 0; i < 16; ++i )
+        {
+            random.nextBytes(bytes);
+            raf.write(bytes);
+        }
+        raf.close();
+        
+        String file = "C:\\Engagements\\Checksum-1.db";
+        ChecksumManager.instance("C:\\Engagements\\Table-ColumnFamily-1-Data.dat", file);
+        
+        raf = new ChecksumRandomAccessFile("C:\\Engagements\\Table-ColumnFamily-1-Data.dat", "rw", 4*1024*1024);
+        bytes = new byte[32*1024];
+        
+        for ( int i = 0; i < 16; ++i )
+        {           
+            raf.readFully(bytes);
+        }
+        raf.close();
+       
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io;
+
+/**
+ * Section of a file that needs to be scanned
+ * is represented by this class.
+*/
+class Coordinate
+{
+    long start_;
+    long end_;
+    
+    Coordinate(long start, long end)
+    {
+        start_ = start;
+        end_ = end;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/DataInputBuffer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/DataInputBuffer.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/DataInputBuffer.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/DataInputBuffer.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.cassandra.continuations.Suspendable;
+
+
+/**
+ * An implementation of the DataInputStream interface. This instance is completely thread 
+ * unsafe.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class DataInputBuffer extends DataInputStream
+{
+    /*
+     * This is a clone of the ByteArrayInputStream class w/o any
+     * method being synchronized.
+    */
+    public static class FastByteArrayInputStream extends InputStream
+    {             
+        /**
+         * An array of bytes that was provided by the creator of the stream.
+         * Elements <code>buf[0]</code> through <code>buf[count-1]</code> are
+         * the only bytes that can ever be read from the stream; element
+         * <code>buf[pos]</code> is the next byte to be read.
+         */
+        protected byte buf[];
+
+        /**
+         * The index of the next character to read from the input stream buffer.
+         * This value should always be nonnegative and not larger than the value of
+         * <code>count</code>. The next byte to be read from the input stream
+         * buffer will be <code>buf[pos]</code>.
+         */
+        protected int pos;
+
+        /**
+         * The currently marked position in the stream. ByteArrayInputStream objects
+         * are marked at position zero by default when constructed. They may be
+         * marked at another position within the buffer by the <code>mark()</code>
+         * method. The current buffer position is set to this point by the
+         * <code>reset()</code> method.
+         * <p>
+         * If no mark has been set, then the value of mark is the offset passed to
+         * the constructor (or 0 if the offset was not supplied).
+         * 
+         * @since JDK1.1
+         */
+        protected int mark = 0;
+
+        /**
+         * The index one greater than the last valid character in the input stream
+         * buffer. This value should always be nonnegative and not larger than the
+         * length of <code>buf</code>. It is one greater than the position of the
+         * last byte within <code>buf</code> that can ever be read from the input
+         * stream buffer.
+         */
+        protected int count;
+        
+        public FastByteArrayInputStream()
+        {
+            buf = new byte[0];
+        }
+
+        /**
+         * Creates a <code>ByteArrayInputStream</code> so that it uses
+         * <code>buf</code> as its buffer array. The buffer array is not copied.
+         * The initial value of <code>pos</code> is <code>0</code> and the
+         * initial value of <code>count</code> is the length of <code>buf</code>.
+         * 
+         * @param buf
+         *            the input buffer.
+         */
+        public FastByteArrayInputStream(byte buf[])
+        {
+            this.buf = buf;
+            this.pos = 0;
+            this.count = buf.length;
+        }
+
+        /**
+         * Creates <code>ByteArrayInputStream</code> that uses <code>buf</code>
+         * as its buffer array. The initial value of <code>pos</code> is
+         * <code>offset</code> and the initial value of <code>count</code> is
+         * the minimum of <code>offset+length</code> and <code>buf.length</code>.
+         * The buffer array is not copied. The buffer's mark is set to the specified
+         * offset.
+         * 
+         * @param buf
+         *            the input buffer.
+         * @param offset
+         *            the offset in the buffer of the first byte to read.
+         * @param length
+         *            the maximum number of bytes to read from the buffer.
+         */
+        public FastByteArrayInputStream(byte buf[], int offset, int length)
+        {
+            this.buf = buf;
+            this.pos = offset;
+            this.count = Math.min(offset + length, buf.length);
+            this.mark = offset;
+        }
+        
+        public final void setBytes(byte[] bytes)
+        {
+            buf = bytes;
+            pos = 0;
+            count = bytes.length;            
+        }
+
+        /**
+         * Reads the next byte of data from this input stream. The value byte is
+         * returned as an <code>int</code> in the range <code>0</code> to
+         * <code>255</code>. If no byte is available because the end of the
+         * stream has been reached, the value <code>-1</code> is returned.
+         * <p>
+         * This <code>read</code> method cannot block.
+         * 
+         * @return the next byte of data, or <code>-1</code> if the end of the
+         *         stream has been reached.
+         */
+        public final int read()
+        {                  
+            return (pos < count) ? ( buf[pos++] & 0xFF ) : -1;            
+        }
+
+        /**
+         * Reads up to <code>len</code> bytes of data into an array of bytes from
+         * this input stream. If <code>pos</code> equals <code>count</code>,
+         * then <code>-1</code> is returned to indicate end of file. Otherwise,
+         * the number <code>k</code> of bytes read is equal to the smaller of
+         * <code>len</code> and <code>count-pos</code>. If <code>k</code> is
+         * positive, then bytes <code>buf[pos]</code> through
+         * <code>buf[pos+k-1]</code> are copied into <code>b[off]</code> through
+         * <code>b[off+k-1]</code> in the manner performed by
+         * <code>System.arraycopy</code>. The value <code>k</code> is added
+         * into <code>pos</code> and <code>k</code> is returned.
+         * <p>
+         * This <code>read</code> method cannot block.
+         * 
+         * @param b
+         *            the buffer into which the data is read.
+         * @param off
+         *            the start offset in the destination array <code>b</code>
+         * @param len
+         *            the maximum number of bytes read.
+         * @return the total number of bytes read into the buffer, or
+         *         <code>-1</code> if there is no more data because the end of the
+         *         stream has been reached.
+         * @exception NullPointerException
+         *                If <code>b</code> is <code>null</code>.
+         * @exception IndexOutOfBoundsException
+         *                If <code>off</code> is negative, <code>len</code> is
+         *                negative, or <code>len</code> is greater than
+         *                <code>b.length - off</code>
+         */
+        public final int read(byte b[], int off, int len)
+        {
+            if (b == null)
+            {
+                throw new NullPointerException();
+            }
+            else if (off < 0 || len < 0 || len > b.length - off)
+            {
+                throw new IndexOutOfBoundsException();
+            }
+            if (pos >= count)
+            {
+                return -1;
+            }
+            if (pos + len > count)
+            {
+                len = count - pos;
+            }
+            if (len <= 0)
+            {
+                return 0;
+            }
+            System.arraycopy(buf, pos, b, off, len);
+            pos += len;
+            return len;
+        }
+
+        /**
+         * Skips <code>n</code> bytes of input from this input stream. Fewer bytes
+         * might be skipped if the end of the input stream is reached. The actual
+         * number <code>k</code> of bytes to be skipped is equal to the smaller of
+         * <code>n</code> and <code>count-pos</code>. The value <code>k</code>
+         * is added into <code>pos</code> and <code>k</code> is returned.
+         * 
+         * @param n
+         *            the number of bytes to be skipped.
+         * @return the actual number of bytes skipped.
+         */
+        public final long skip(long n)
+        {
+            if (pos + n > count)
+            {
+                n = count - pos;
+            }
+            if (n < 0)
+            {
+                return 0;
+            }
+            pos += n;
+            return n;
+        }
+
+        /**
+         * Returns the number of remaining bytes that can be read (or skipped over)
+         * from this input stream.
+         * <p>
+         * The value returned is <code>count&nbsp;- pos</code>, which is the
+         * number of bytes remaining to be read from the input buffer.
+         * 
+         * @return the number of remaining bytes that can be read (or skipped over)
+         *         from this input stream without blocking.
+         */
+        public final int available()
+        {
+            return count - pos;
+        }
+
+        /**
+         * Tests if this <code>InputStream</code> supports mark/reset. The
+         * <code>markSupported</code> method of <code>ByteArrayInputStream</code>
+         * always returns <code>true</code>.
+         * 
+         * @since JDK1.1
+         */
+        public final boolean markSupported()
+        {
+            return true;
+        }
+
+        /**
+         * Set the current marked position in the stream. ByteArrayInputStream
+         * objects are marked at position zero by default when constructed. They may
+         * be marked at another position within the buffer by this method.
+         * <p>
+         * If no mark has been set, then the value of the mark is the offset passed
+         * to the constructor (or 0 if the offset was not supplied).
+         * 
+         * <p>
+         * Note: The <code>readAheadLimit</code> for this class has no meaning.
+         * 
+         * @since JDK1.1
+         */
+        public final void mark(int readAheadLimit)
+        {
+            mark = pos;
+        }
+
+        /**
+         * Resets the buffer to the marked position. The marked position is 0 unless
+         * another position was marked or an offset was specified in the
+         * constructor.
+         */
+        public final void reset()
+        {
+            pos = mark;
+        }
+
+        /**
+         * Closing a <tt>ByteArrayInputStream</tt> has no effect. The methods in
+         * this class can be called after the stream has been closed without
+         * generating an <tt>IOException</tt>.
+         * <p>
+         */
+        public final void close() throws IOException
+        {
+        }
+    }
+    
+    private static class Buffer extends FastByteArrayInputStream
+    {        
+        public Buffer()
+        {
+            super(new byte[] {});
+        }
+
+        public void reset(byte[] input, int start, int length)
+        {
+            this.buf = input;
+            this.count = start + length;
+            this.mark = start;
+            this.pos = start;
+        }
+        
+        public int getPosition()
+        {
+            return pos;
+        }
+        
+        public void setPosition(int position)
+        {
+            pos = position;
+        }        
+
+        public int getLength()
+        {
+            return count;
+        }
+    }
+
+    private Buffer buffer_;
+
+    /** Constructs a new empty buffer. */
+    public DataInputBuffer()
+    {
+        this(new Buffer());
+    }
+
+    private DataInputBuffer(Buffer buffer)
+    {
+        super(buffer);
+        this.buffer_ = buffer;
+    }
+   
+    /** Resets the data that the buffer reads. */
+    public void reset(byte[] input, int length)
+    {
+        buffer_.reset(input, 0, length);
+    }
+
+    /** Resets the data that the buffer reads. */
+    public void reset(byte[] input, int start, int length)
+    {
+        buffer_.reset(input, start, length);
+    }
+
+    /** Returns the current position in the input. */
+    public int getPosition()
+    {
+        return buffer_.getPosition();
+    }
+    
+    /** Set the position within the input */
+    public void setPosition(int position)
+    {
+        buffer_.setPosition(position);
+    }
+
+    /** Returns the length of the input. */
+    public int getLength()
+    {
+        return buffer_.getLength();
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/DataOutputBuffer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/DataOutputBuffer.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/DataOutputBuffer.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/DataOutputBuffer.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+
+import org.apache.cassandra.continuations.Suspendable;
+
+
+/**
+ * An implementation of the DataOutputStream interface. This class is completely thread
+ * unsafe.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class DataOutputBuffer extends DataOutputStream
+{
+    /*
+     * This is a clone of the ByteArrayOutputStream but w/o the unnecessary
+     * synchronized keyword usage.
+    */
+    public static class FastByteArrayOutputStream extends OutputStream
+    {
+        
+        /**
+         * The buffer where data is stored.
+         */
+        protected byte buf[];
+        
+        /**
+         * The number of valid bytes in the buffer.
+         */
+        protected int count;
+        
+        /**
+         * Creates a new byte array output stream. The buffer capacity is
+         * initially 32 bytes, though its size increases if necessary.
+         */
+        public FastByteArrayOutputStream()
+        {
+            this(32);
+        }
+        
+        /**
+         * Creates a new byte array output stream, with a buffer capacity of the
+         * specified size, in bytes.
+         * 
+         * @param size
+         *            the initial size.
+         * @exception IllegalArgumentException
+         *                if size is negative.
+         */
+        public FastByteArrayOutputStream(int size)
+        {
+            if (size < 0)
+            {
+                throw new IllegalArgumentException("Negative initial size: "
+                        + size);
+            }
+            buf = new byte[size];
+        }
+        
+        /**
+         * Writes the specified byte to this byte array output stream.
+         * 
+         * @param b
+         *            the byte to be written.
+         */
+        public void write(int b)
+        {
+            int newcount = count + 1;
+            if (newcount > buf.length)
+            {
+                buf = Arrays.copyOf(buf, Math.max(buf.length << 1, newcount));
+            }
+            buf[count] = (byte) b;
+            count = newcount;
+        }
+        
+        /**
+         * Writes <code>len</code> bytes from the specified byte array
+         * starting at offset <code>off</code> to this byte array output
+         * stream.
+         * 
+         * @param b
+         *            the data.
+         * @param off
+         *            the start offset in the data.
+         * @param len
+         *            the number of bytes to write.
+         */
+        public void write(byte b[], int off, int len)
+        {
+            if ((off < 0) || (off > b.length) || (len < 0)
+                    || ((off + len) > b.length) || ((off + len) < 0))
+            {
+                throw new IndexOutOfBoundsException();
+            }
+            else if (len == 0)
+            {
+                return;
+            }
+            int newcount = count + len;
+            if (newcount > buf.length)
+            {
+                buf = Arrays.copyOf(buf, Math.max(buf.length << 1, newcount));
+            }
+            System.arraycopy(b, off, buf, count, len);
+            count = newcount;
+        }
+        
+        /**
+         * Writes the complete contents of this byte array output stream to the
+         * specified output stream argument, as if by calling the output
+         * stream's write method using <code>out.write(buf, 0, count)</code>.
+         * 
+         * @param out
+         *            the output stream to which to write the data.
+         * @exception IOException
+         *                if an I/O error occurs.
+         */
+        public void writeTo(OutputStream out) throws IOException
+        {
+            out.write(buf, 0, count);
+        }
+        
+        /**
+         * Resets the <code>count</code> field of this byte array output
+         * stream to zero, so that all currently accumulated output in the
+         * output stream is discarded. The output stream can be used again,
+         * reusing the already allocated buffer space.
+         * 
+         * @see java.io.ByteArrayInputStream#count
+         */
+        public void reset()
+        {
+            count = 0;
+        }
+        
+        /**
+         * Creates a newly allocated byte array. Its size is the current size of
+         * this output stream and the valid contents of the buffer have been
+         * copied into it.
+         * 
+         * @return the current contents of this output stream, as a byte array.
+         * @see java.io.ByteArrayOutputStream#size()
+         */
+        public byte toByteArray()[]
+        {
+            return Arrays.copyOf(buf, count);
+        }
+        
+        /**
+         * Returns the current size of the buffer.
+         * 
+         * @return the value of the <code>count</code> field, which is the
+         *         number of valid bytes in this output stream.
+         * @see java.io.ByteArrayOutputStream#count
+         */
+        public int size()
+        {
+            return count;
+        }
+        
+        /**
+         * Converts the buffer's contents into a string decoding bytes using the
+         * platform's default character set. The length of the new
+         * <tt>String</tt> is a function of the character set, and hence may
+         * not be equal to the size of the buffer.
+         * 
+         * <p>
+         * This method always replaces malformed-input and unmappable-character
+         * sequences with the default replacement string for the platform's
+         * default character set. The
+         * {@linkplain java.nio.charset.CharsetDecoder} class should be used
+         * when more control over the decoding process is required.
+         * 
+         * @return String decoded from the buffer's contents.
+         * @since JDK1.1
+         */
+        public String toString()
+        {
+            return new String(buf, 0, count);
+        }
+        
+        /**
+         * Converts the buffer's contents into a string by decoding the bytes
+         * using the specified {@link java.nio.charset.Charset charsetName}.
+         * The length of the new <tt>String</tt> is a function of the charset,
+         * and hence may not be equal to the length of the byte array.
+         * 
+         * <p>
+         * This method always replaces malformed-input and unmappable-character
+         * sequences with this charset's default replacement string. The {@link
+         * java.nio.charset.CharsetDecoder} class should be used when more
+         * control over the decoding process is required.
+         * 
+         * @param charsetName
+         *            the name of a supported
+         *            {@linkplain java.nio.charset.Charset </code>charset<code>}
+         * @return String decoded from the buffer's contents.
+         * @exception UnsupportedEncodingException
+         *                If the named charset is not supported
+         * @since JDK1.1
+         */
+        public String toString(String charsetName) throws UnsupportedEncodingException
+        {
+            return new String(buf, 0, count, charsetName);
+        }
+        
+        /**
+         * Creates a newly allocated string. Its size is the current size of the
+         * output stream and the valid contents of the buffer have been copied
+         * into it. Each character <i>c</i> in the resulting string is
+         * constructed from the corresponding element <i>b</i> in the byte
+         * array such that: <blockquote>
+         * 
+         * <pre>
+         * c == (char) (((hibyte &amp; 0xff) &lt;&lt; 8) | (b &amp; 0xff))
+         * </pre>
+         * 
+         * </blockquote>
+         * 
+         * @deprecated This method does not properly convert bytes into
+         *             characters. As of JDK&nbsp;1.1, the preferred way to do
+         *             this is via the <code>toString(String enc)</code>
+         *             method, which takes an encoding-name argument, or the
+         *             <code>toString()</code> method, which uses the
+         *             platform's default character encoding.
+         * 
+         * @param hibyte
+         *            the high byte of each resulting Unicode character.
+         * @return the current contents of the output stream, as a string.
+         * @see java.io.ByteArrayOutputStream#size()
+         * @see java.io.ByteArrayOutputStream#toString(String)
+         * @see java.io.ByteArrayOutputStream#toString()
+         */
+        @Deprecated
+        public String toString(int hibyte)
+        {
+            return new String(buf, hibyte, 0, count);
+        }
+        
+        /**
+         * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods
+         * in this class can be called after the stream has been closed without
+         * generating an <tt>IOException</tt>.
+         * <p>
+         * 
+         */
+        public void close() throws IOException
+        {
+        }
+        
+    }
+      
+    private static class Buffer extends FastByteArrayOutputStream
+    {
+        public byte[] getData()
+        {
+            return buf;
+        }
+        
+        public int getLength()
+        {
+            return count;
+        }
+        
+        public void reset()
+        {
+            count = 0;
+        }
+        
+        public void write(DataInput in, int len) throws IOException
+        {
+            int newcount = count + len;
+            if (newcount > buf.length)
+            {
+                byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+                System.arraycopy(buf, 0, newbuf, 0, count);
+                buf = newbuf;
+            }
+            long start = System.currentTimeMillis();
+            in.readFully(buf, count, len);
+            count = newcount;
+        }
+        
+        public void write(ByteBuffer buffer, int len) throws IOException
+        {
+            int newcount = count + len;
+            if (newcount > buf.length)
+            {
+                byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+                System.arraycopy(buf, 0, newbuf, 0, count);
+                buf = newbuf;
+            }
+            long start = System.currentTimeMillis();
+            buffer.get(buf, count, len);            
+            count = newcount;
+        }
+    }
+    
+    private Buffer buffer;
+    
+    /** Constructs a new empty buffer. */
+    public DataOutputBuffer()
+    {
+        this(new Buffer());
+    }
+    
+    private DataOutputBuffer(Buffer buffer)
+    {
+        super(buffer);
+        this.buffer = buffer;
+    }
+    
+    /**
+     * Returns the current contents of the buffer. Data is only valid to
+     * {@link #getLength()}.
+     */
+    public byte[] getData()
+    {
+        return buffer.getData();
+    }
+    
+    /** Returns the length of the valid data currently in the buffer. */
+    public int getLength()
+    {
+        return buffer.getLength();
+    }
+    
+    /** Resets the buffer to empty. */
+    public DataOutputBuffer reset()
+    {
+        this.written = 0;
+        buffer.reset();
+        return this;
+    }
+    
+    /** Writes bytes from a DataInput directly into the buffer. */
+    public void write(DataInput in, int length) throws IOException
+    {
+        buffer.write(in, length);
+    }   
+    
+    /** Writes bytes from a ByteBuffer directly into the buffer. */
+    public void write(ByteBuffer in, int length) throws IOException
+    {
+        buffer.write(in, length);
+    }   
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/FastBufferedInputStream.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/FastBufferedInputStream.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/FastBufferedInputStream.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/FastBufferedInputStream.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * A <code>BufferedInputStream</code> adds functionality to another input
+ * stream-namely, the ability to buffer the input and to support the
+ * <code>mark</code> and <code>reset</code> methods. When the
+ * <code>BufferedInputStream</code> is created, an internal buffer array is
+ * created. As bytes from the stream are read or skipped, the internal buffer is
+ * refilled as necessary from the contained input stream, many bytes at a time.
+ * The <code>mark</code> operation remembers a point in the input stream and
+ * the <code>reset</code> operation causes all the bytes read since the most
+ * recent <code>mark</code> operation to be reread before new bytes are taken
+ * from the contained input stream.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class FastBufferedInputStream extends FilterInputStream
+{
+    
+    private static int defaultBufferSize = 8192;
+    
+    /**
+     * The internal buffer array where the data is stored. When necessary, it
+     * may be replaced by another array of a different size.
+     */
+    protected volatile byte buf[];
+    
+    /**
+     * Atomic updater to provide compareAndSet for buf. This is necessary
+     * because closes can be asynchronous. We use nullness of buf[] as primary
+     * indicator that this stream is closed. (The "in" field is also nulled out
+     * on close.)
+     */
+    private static final AtomicReferenceFieldUpdater<FastBufferedInputStream, byte[]> bufUpdater = AtomicReferenceFieldUpdater
+    .newUpdater(FastBufferedInputStream.class, byte[].class, "buf");
+    
+    /**
+     * The index one greater than the index of the last valid byte in the
+     * buffer. This value is always in the range <code>0</code> through
+     * <code>buf.length</code>; elements <code>buf[0]</code> through
+     * <code>buf[count-1]
+     * </code>contain buffered input data obtained from the
+     * underlying input stream.
+     */
+    protected int count;
+    
+    /**
+     * The current position in the buffer. This is the index of the next
+     * character to be read from the <code>buf</code> array.
+     * <p>
+     * This value is always in the range <code>0</code> through
+     * <code>count</code>. If it is less than <code>count</code>, then
+     * <code>buf[pos]</code> is the next byte to be supplied as input; if it
+     * is equal to <code>count</code>, then the next <code>read</code> or
+     * <code>skip</code> operation will require more bytes to be read from the
+     * contained input stream.
+     * 
+     * @see java.io.BufferedInputStream#buf
+     */
+    protected int pos;
+    
+    /**
+     * The value of the <code>pos</code> field at the time the last
+     * <code>mark</code> method was called.
+     * <p>
+     * This value is always in the range <code>-1</code> through
+     * <code>pos</code>. If there is no marked position in the input stream,
+     * this field is <code>-1</code>. If there is a marked position in the
+     * input stream, then <code>buf[markpos]</code> is the first byte to be
+     * supplied as input after a <code>reset</code> operation. If
+     * <code>markpos</code> is not <code>-1</code>, then all bytes from
+     * positions <code>buf[markpos]</code> through <code>buf[pos-1]</code>
+     * must remain in the buffer array (though they may be moved to another
+     * place in the buffer array, with suitable adjustments to the values of
+     * <code>count</code>, <code>pos</code>, and <code>markpos</code>);
+     * they may not be discarded unless and until the difference between
+     * <code>pos</code> and <code>markpos</code> exceeds
+     * <code>marklimit</code>.
+     * 
+     * @see java.io.BufferedInputStream#mark(int)
+     * @see java.io.BufferedInputStream#pos
+     */
+    protected int markpos = -1;
+    
+    /**
+     * The maximum read ahead allowed after a call to the <code>mark</code>
+     * method before subsequent calls to the <code>reset</code> method fail.
+     * Whenever the difference between <code>pos</code> and
+     * <code>markpos</code> exceeds <code>marklimit</code>, then the mark
+     * may be dropped by setting <code>markpos</code> to <code>-1</code>.
+     * 
+     * @see java.io.BufferedInputStream#mark(int)
+     * @see java.io.BufferedInputStream#reset()
+     */
+    protected int marklimit;
+    
+    /**
+     * Check to make sure that underlying input stream has not been nulled out
+     * due to close; if not return it;
+     */
+    private InputStream getInIfOpen() throws IOException
+    {
+        InputStream input = in;
+        if (input == null)
+            throw new IOException("Stream closed");
+        return input;
+    }
+    
+    /**
+     * Check to make sure that buffer has not been nulled out due to close; if
+     * not return it;
+     */
+    private byte[] getBufIfOpen() throws IOException
+    {
+        byte[] buffer = buf;
+        if (buffer == null)
+            throw new IOException("Stream closed");
+        return buffer;
+    }
+    
+    /**
+     * Creates a <code>BufferedInputStream</code> and saves its argument, the
+     * input stream <code>in</code>, for later use. An internal buffer array
+     * is created and stored in <code>buf</code>.
+     * 
+     * @param in
+     *            the underlying input stream.
+     */
+    public FastBufferedInputStream(InputStream in)
+    {
+        this(in, defaultBufferSize);
+    }
+    
+    /**
+     * Creates a <code>BufferedInputStream</code> with the specified buffer
+     * size, and saves its argument, the input stream <code>in</code>, for
+     * later use. An internal buffer array of length <code>size</code> is
+     * created and stored in <code>buf</code>.
+     * 
+     * @param in
+     *            the underlying input stream.
+     * @param size
+     *            the buffer size.
+     * @exception IllegalArgumentException
+     *                if size <= 0.
+     */
+    public FastBufferedInputStream(InputStream in, int size)
+    {
+        super(in);
+        if (size <= 0)
+        {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        buf = new byte[size];
+    }
+    
+    /**
+     * Fills the buffer with more data, taking into account shuffling and other
+     * tricks for dealing with marks. Assumes that it is being called by a
+     * synchronized method. This method also assumes that all data has already
+     * been read in, hence pos > count.
+     */
+    private void fill() throws IOException
+    {
+        byte[] buffer = getBufIfOpen();
+        if (markpos < 0)
+            pos = 0; /* no mark: throw away the buffer */
+        else if (pos >= buffer.length) /* no room left in buffer */
+            if (markpos > 0)
+            { /* can throw away early part of the buffer */
+                int sz = pos - markpos;
+                System.arraycopy(buffer, markpos, buffer, 0, sz);
+                pos = sz;
+                markpos = 0;
+            }
+            else if (buffer.length >= marklimit)
+            {
+                markpos = -1; /* buffer got too big, invalidate mark */
+                pos = 0; /* drop buffer contents */
+            }
+            else
+            { /* grow buffer */
+                int nsz = pos * 2;
+                if (nsz > marklimit)
+                    nsz = marklimit;
+                byte nbuf[] = new byte[nsz];
+                System.arraycopy(buffer, 0, nbuf, 0, pos);
+                if (!bufUpdater.compareAndSet(this, buffer, nbuf))
+                {
+                    // Can't replace buf if there was an async close.
+                    // Note: This would need to be changed if fill()
+                    // is ever made accessible to multiple threads.
+                    // But for now, the only way CAS can fail is via close.
+                    // assert buf == null;
+                    throw new IOException("Stream closed");
+                }
+                buffer = nbuf;
+            }
+        count = pos;
+        int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
+        if (n > 0)
+            count = n + pos;
+    }
+    
+    /**
+     * See the general contract of the <code>read</code> method of
+     * <code>InputStream</code>.
+     * 
+     * @return the next byte of data, or <code>-1</code> if the end of the
+     *         stream is reached.
+     * @exception IOException
+     *                if this input stream has been closed by invoking its
+     *                {@link #close()} method, or an I/O error occurs.
+     * @see java.io.FilterInputStream#in
+     */
+    public  int read() throws IOException
+    {
+        if (pos >= count)
+        {
+            fill();
+            if (pos >= count)
+                return -1;
+        }
+        return getBufIfOpen()[pos++] & 0xff;
+    }
+    
+    /**
+     * Read characters into a portion of an array, reading from the underlying
+     * stream at most once if necessary.
+     */
+    private int read1(byte[] b, int off, int len) throws IOException
+    {
+        int avail = count - pos;
+        if (avail <= 0)
+        {
+            /*
+             * If the requested length is at least as large as the buffer, and
+             * if there is no mark/reset activity, do not bother to copy the
+             * bytes into the local buffer. In this way buffered streams will
+             * cascade harmlessly.
+             */
+            if (len >= getBufIfOpen().length && markpos < 0)
+            {
+                return getInIfOpen().read(b, off, len);
+            }
+            fill();
+            avail = count - pos;
+            if (avail <= 0)
+                return -1;
+        }
+        int cnt = (avail < len) ? avail : len;
+        System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
+        pos += cnt;
+        return cnt;
+    }
+    
+    /**
+     * Reads bytes from this byte-input stream into the specified byte array,
+     * starting at the given offset.
+     * 
+     * <p>
+     * This method implements the general contract of the corresponding
+     * <code>{@link InputStream#read(byte[], int, int) read}</code> method of
+     * the <code>{@link InputStream}</code> class. As an additional
+     * convenience, it attempts to read as many bytes as possible by repeatedly
+     * invoking the <code>read</code> method of the underlying stream. This
+     * iterated <code>read</code> continues until one of the following
+     * conditions becomes true:
+     * <ul>
+     * 
+     * <li> The specified number of bytes have been read,
+     * 
+     * <li> The <code>read</code> method of the underlying stream returns
+     * <code>-1</code>, indicating end-of-file, or
+     * 
+     * <li> The <code>available</code> method of the underlying stream returns
+     * zero, indicating that further input requests would block.
+     * 
+     * </ul>
+     * If the first <code>read</code> on the underlying stream returns
+     * <code>-1</code> to indicate end-of-file then this method returns
+     * <code>-1</code>. Otherwise this method returns the number of bytes
+     * actually read.
+     * 
+     * <p>
+     * Subclasses of this class are encouraged, but not required, to attempt to
+     * read as many bytes as possible in the same fashion.
+     * 
+     * @param b
+     *            destination buffer.
+     * @param off
+     *            offset at which to start storing bytes.
+     * @param len
+     *            maximum number of bytes to read.
+     * @return the number of bytes read, or <code>-1</code> if the end of the
+     *         stream has been reached.
+     * @exception IOException
+     *                if this input stream has been closed by invoking its
+     *                {@link #close()} method, or an I/O error occurs.
+     */
+    public  int read(byte b[], int off, int len) throws IOException
+    {
+        getBufIfOpen(); // Check for closed stream
+        if ((off | len | (off + len) | (b.length - (off + len))) < 0)
+        {
+            throw new IndexOutOfBoundsException();
+        }
+        else if (len == 0)
+        {
+            return 0;
+        }
+        
+        int n = 0;
+        for (;;)
+        {
+            int nread = read1(b, off + n, len - n);
+            if (nread <= 0)
+                return (n == 0) ? nread : n;
+            n += nread;
+            if (n >= len)
+                return n;
+            // if not closed but no bytes available, return
+            InputStream input = in;
+            if (input != null && input.available() <= 0)
+                return n;
+        }
+    }
+    
+    /**
+     * See the general contract of the <code>skip</code> method of
+     * <code>InputStream</code>.
+     * 
+     * @exception IOException
+     *                if the stream does not support seek, or if this input
+     *                stream has been closed by invoking its {@link #close()}
+     *                method, or an I/O error occurs.
+     */
+    public  long skip(long n) throws IOException
+    {
+        getBufIfOpen(); // Check for closed stream
+        if (n <= 0)
+        {
+            return 0;
+        }
+        long avail = count - pos;
+        
+        if (avail <= 0)
+        {
+            // If no mark position set then don't keep in buffer
+            if (markpos < 0)
+                return getInIfOpen().skip(n);
+            
+            // Fill in buffer to save bytes for reset
+            fill();
+            avail = count - pos;
+            if (avail <= 0)
+                return 0;
+        }
+        
+        long skipped = (avail < n) ? avail : n;
+        pos += skipped;
+        return skipped;
+    }
+    
+    /**
+     * Returns an estimate of the number of bytes that can be read (or skipped
+     * over) from this input stream without blocking by the next invocation of a
+     * method for this input stream. The next invocation might be the same
+     * thread or another thread. A single read or skip of this many bytes will
+     * not block, but may read or skip fewer bytes.
+     * <p>
+     * This method returns the sum of the number of bytes remaining to be read
+     * in the buffer (<code>count&nbsp;- pos</code>) and the result of
+     * calling the {@link java.io.FilterInputStream#in in}.available().
+     * 
+     * @return an estimate of the number of bytes that can be read (or skipped
+     *         over) from this input stream without blocking.
+     * @exception IOException
+     *                if this input stream has been closed by invoking its
+     *                {@link #close()} method, or an I/O error occurs.
+     */
+    public  int available() throws IOException
+    {
+        return getInIfOpen().available() + (count - pos);
+    }
+    
+    /**
+     * See the general contract of the <code>mark</code> method of
+     * <code>InputStream</code>.
+     * 
+     * @param readlimit
+     *            the maximum limit of bytes that can be read before the mark
+     *            position becomes invalid.
+     * @see java.io.BufferedInputStream#reset()
+     */
+    public  void mark(int readlimit)
+    {
+        marklimit = readlimit;
+        markpos = pos;
+    }
+    
+    /**
+     * See the general contract of the <code>reset</code> method of
+     * <code>InputStream</code>.
+     * <p>
+     * If <code>markpos</code> is <code>-1</code> (no mark has been set or
+     * the mark has been invalidated), an <code>IOException</code> is thrown.
+     * Otherwise, <code>pos</code> is set equal to <code>markpos</code>.
+     * 
+     * @exception IOException
+     *                if this stream has not been marked or, if the mark has
+     *                been invalidated, or the stream has been closed by
+     *                invoking its {@link #close()} method, or an I/O error
+     *                occurs.
+     * @see java.io.BufferedInputStream#mark(int)
+     */
+    public  void reset() throws IOException
+    {
+        getBufIfOpen(); // Cause exception if closed
+        if (markpos < 0)
+            throw new IOException("Resetting to invalid mark");
+        pos = markpos;
+    }
+    
+    /**
+     * Tests if this input stream supports the <code>mark</code> and
+     * <code>reset</code> methods. The <code>markSupported</code> method of
+     * <code>BufferedInputStream</code> returns <code>true</code>.
+     * 
+     * @return a <code>boolean</code> indicating if this stream type supports
+     *         the <code>mark</code> and <code>reset</code> methods.
+     * @see java.io.InputStream#mark(int)
+     * @see java.io.InputStream#reset()
+     */
+    public boolean markSupported()
+    {
+        return true;
+    }
+    
+    /**
+     * Closes this input stream and releases any system resources associated
+     * with the stream. Once the stream has been closed, further read(),
+     * available(), reset(), or skip() invocations will throw an IOException.
+     * Closing a previously closed stream has no effect.
+     * 
+     * @exception IOException
+     *                if an I/O error occurs.
+     */
+    public void close() throws IOException
+    {
+        byte[] buffer;
+        while ((buffer = buf) != null)
+        {
+            if (bufUpdater.compareAndSet(this, buffer, null))
+            {
+                InputStream input = in;
+                in = null;
+                if (input != null)
+                    input.close();
+                return;
+            }
+            // Else retry in case a new buf was CASed in fill()
+        }
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/FastBufferedOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/FastBufferedOutputStream.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/FastBufferedOutputStream.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/FastBufferedOutputStream.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+
+/**
+ * The class implements a buffered output stream. By setting up such an output
+ * stream, an application can write bytes to the underlying output stream
+ * without necessarily causing a call to the underlying system for each byte
+ * written.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class FastBufferedOutputStream extends FilterOutputStream
+{
+    /**
+     * The internal buffer where data is stored.
+     */
+    protected byte buf[];
+    
+    /**
+     * The number of valid bytes in the buffer. This value is always in the
+     * range <tt>0</tt> through <tt>buf.length</tt>; elements
+     * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid byte
+     * data.
+     */
+    protected int count;
+    
+    /**
+     * Creates a new buffered output stream to write data to the specified
+     * underlying output stream.
+     * 
+     * @param out
+     *            the underlying output stream.
+     */
+    public FastBufferedOutputStream(OutputStream out)
+    {
+        this(out, 8192);
+    }
+    
+    /**
+     * Creates a new buffered output stream to write data to the specified
+     * underlying output stream with the specified buffer size.
+     * 
+     * @param out
+     *            the underlying output stream.
+     * @param size
+     *            the buffer size.
+     * @exception IllegalArgumentException
+     *                if size &lt;= 0.
+     */
+    public FastBufferedOutputStream(OutputStream out, int size)
+    {
+        super(out);
+        if (size <= 0)
+        {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        buf = new byte[size];
+    }
+    
+    /** Flush the internal buffer */
+    private void flushBuffer() throws IOException
+    {
+        if (count > 0)
+        {
+            out.write(buf, 0, count);
+            count = 0;
+        }
+    }
+    
+    /**
+     * Writes the specified byte to this buffered output stream.
+     * 
+     * @param b
+     *            the byte to be written.
+     * @exception IOException
+     *                if an I/O error occurs.
+     */
+    public void write(int b) throws IOException
+    {
+        if (count >= buf.length)
+        {
+            flushBuffer();
+        }
+        buf[count++] = (byte) b;
+    }
+    
+    /**
+     * Writes <code>len</code> bytes from the specified byte array starting at
+     * offset <code>off</code> to this buffered output stream.
+     * 
+     * <p>
+     * Ordinarily this method stores bytes from the given array into this
+     * stream's buffer, flushing the buffer to the underlying output stream as
+     * needed. If the requested length is at least as large as this stream's
+     * buffer, however, then this method will flush the buffer and write the
+     * bytes directly to the underlying output stream. Thus redundant
+     * <code>BufferedOutputStream</code>s will not copy data unnecessarily.
+     * 
+     * @param b
+     *            the data.
+     * @param off
+     *            the start offset in the data.
+     * @param len
+     *            the number of bytes to write.
+     * @exception IOException
+     *                if an I/O error occurs.
+     */
+    public void write(byte b[], int off, int len)
+    throws IOException
+    {
+        if (len >= buf.length)
+        {
+            /*
+             * If the request length exceeds the size of the output buffer,
+             * flush the output buffer and then write the data directly. In this
+             * way buffered streams will cascade harmlessly.
+             */
+            flushBuffer();
+            out.write(b, off, len);
+            return;
+        }
+        if (len > buf.length - count)
+        {
+            flushBuffer();
+        }
+        System.arraycopy(b, off, buf, count, len);
+        count += len;
+    }
+    
+    /**
+     * Flushes this buffered output stream. This forces any buffered output
+     * bytes to be written out to the underlying output stream.
+     * 
+     * @exception IOException
+     *                if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    public void flush() throws IOException
+    {
+        flushBuffer();
+        out.flush();
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/ICompactSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/ICompactSerializer.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/ICompactSerializer.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/ICompactSerializer.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.DataInputStream;
+
+/**
+ * Allows for the controlled serialization/deserialization of a given type.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface ICompactSerializer<T>
+{
+	/**
+     * Serialize the specified type into the specified DataOutputStream instance.
+     * @param t type that needs to be serialized
+     * @param dos DataOutput into which serialization needs to happen.
+     * @throws IOException
+     */
+    public void serialize(T t, DataOutputStream dos) throws IOException;
+
+    /**
+     * Deserialize into the specified DataInputStream instance.
+     * @param dis DataInput from which deserialization needs to happen.
+     * @throws IOException
+     * @return the type that was deserialized
+     */
+    public T deserialize(DataInputStream dis) throws IOException;
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.List;
+
+/**
+ * Interface to read from the SequenceFile abstraction.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFileReader
+{
+    public String getFileName();
+    public long getEOF() throws IOException;
+    public long getCurrentPosition() throws IOException;
+    public boolean isHealthyFileDescriptor() throws IOException;
+    public void seek(long position) throws IOException;
+    public boolean isEOF() throws IOException;
+
+    /**
+     * Be extremely careful while using this API. This currently
+     * used to read the commit log header from the commit logs.
+     * Treat this as an internal API.
+     * 
+     * @param bytes read into this byte array.
+    */
+    public void readDirect(byte[] bytes) throws IOException;
+    
+    /**
+     * Read a long value from the underlying sub system.
+     * @return value read
+     * @throws IOException
+     */
+    public long readLong() throws IOException;
+    
+    /**
+     * This functions is used to help out subsequent reads
+     * on the specified key. It reads the keys prior to this
+     * one on disk so that the buffer cache is hot.
+     * 
+     *  @param key key for which we are performing the touch.
+     *  @param fData true implies we fetch the data into buffer cache.
+    */
+    public long touch(String key , boolean fData) throws IOException;
+    
+    /**
+     * This method helps is retrieving the offset of the specified
+     * key in the file using the block index.
+     * 
+     * @param key key whose position we need in the block index.
+    */
+    public long getPositionFromBlockIndex(String key) throws IOException;
+    
+    /**
+     * This method returns the position of the specified key and the 
+     * size of its data segment from the block index.
+     * 
+     * @param key key whose block metadata we are interested in.
+     * @return an instance of the block metadata for this key.
+    */
+    public SSTable.BlockMetadata getBlockMetadata(String key) throws IOException;
+
+    /**
+     * This method dumps the next key/value into the DataOuputStream
+     * passed in.
+     *
+     * @param dos - DataOutputStream that needs to be filled.
+     * @return number of bytes read.
+     * @throws IOException 
+    */
+    public long next(DataOutputBuffer bufOut) throws IOException;
+
+    /**
+     * This method dumps the next key/value into the DataOuputStream
+     * passed in.
+     *
+     * @param key key we are interested in.
+     * @param dos DataOutputStream that needs to be filled.
+     * @param section region of the file that needs to be read
+     * @throws IOException
+     * @return the number of bytes read.
+    */
+    public long next(String key, DataOutputBuffer bufOut, Coordinate section) throws IOException;
+
+    /**
+     * This method dumps the next key/value into the DataOuputStream
+     * passed in.
+     *
+     * @param key key we are interested in.
+     * @param dos DataOutputStream that needs to be filled.
+     * @param column name of the column in our format.
+     * @param section region of the file that needs to be read
+     * @throws IOException
+     * @return number of bytes that were read.
+    */
+    public long next(String key, DataOutputBuffer bufOut, String column, Coordinate section) throws IOException;
+
+    /**
+     * This method dumps the next key/value into the DataOuputStream
+     * passed in. Always use this method to query for application
+     * specific data as it will have indexes.
+     *
+     * @param key - key we are interested in.
+     * @param dos - DataOutputStream that needs to be filled.
+     * @param cfName - The name of the column family only without the ":"
+     * @param columnNames - The list of columns in the cfName column family
+     * 					     that we want to return
+     * @param section region of the file that needs to be read
+     * @throws IOException
+     * @return number of bytes read.
+     *
+    */
+    public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, List<String> columnNames, Coordinate section) throws IOException;
+    
+    /**
+     * This method dumps the next key/value into the DataOuputStream
+     * passed in.
+     *
+     * @param key key we are interested in.
+     * @param dos DataOutputStream that needs to be filled.
+     * @param column name of the column in our format.
+     * @param timeRange time range we are interested in.
+     * @param section region of the file that needs to be read
+     * @throws IOException
+     * @return number of bytes that were read.
+    */
+    public long next(String key, DataOutputBuffer bufOut, String column, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException;
+
+    /**
+     * Close the file after reading.
+     * @throws IOException
+     */
+    public void close() throws IOException;
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java Mon Mar  2 07:57:22 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.cassandra.db.PrimaryKey;
+
+
+/**
+ * An interface for writing into the SequenceFile abstraction.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFileWriter
+{
+    /**
+     * Get the current position of the file pointer.
+     * @return current file pointer position
+     * @throws IOException
+     */
+    public long getCurrentPosition() throws IOException;
+    
+    /**
+     * @return the last file modification time.
+     */
+    public long lastModified();
+    
+    /**
+     * Seeks the file pointer to the specified position.
+     * @param position position within the file to seek to.
+     * @throws IOException
+     */
+    public void seek(long position) throws IOException;
+    
+    /**
+     * Appends the buffer to the the underlying SequenceFile.
+     * @param buffer buffer which contains the serialized data.
+     * @throws IOException
+     */
+    public void append(DataOutputBuffer buffer) throws IOException;
+    
+    /**
+     * Appends the key and the value to the the underlying SequenceFile.
+     * @param keyBuffer buffer which contains the serialized key.
+     * @param buffer buffer which contains the serialized data.
+     * @throws IOException
+     */
+    public void append(DataOutputBuffer keyBuffer, DataOutputBuffer buffer) throws IOException;
+    
+    /**
+     * Appends the key and the value to the the underlying SequenceFile.
+     * @param key key associated with this peice of data.
+     * @param buffer buffer containing the serialized data.
+     * @throws IOException
+     */
+    public void append(String key, DataOutputBuffer buffer) throws IOException;
+    
+    /**
+     * Appends the key and the value to the the underlying SequenceFile.
+     * @param key key associated with this peice of data.
+     * @param value byte array containing the serialized data.
+     * @throws IOException
+     */
+    public void append(String key, byte[] value) throws IOException;
+    
+    /**
+     * Appends the key and the long value to the the underlying SequenceFile.
+     * This is used in the contruction of the index file associated with a 
+     * SSTable.
+     * @param key key associated with this peice of data.
+     * @param value value associated with this key.
+     * @throws IOException
+     */
+    public void append(String key, long value) throws IOException;
+    
+    /**
+     * Be extremely careful while using this API. This currently
+     * used to write the commit log header in the commit logs.
+     * If not used carefully it could completely screw up reads
+     * of other key/value pairs that are written. 
+     * 
+     * @param bytes serialized version of the commit log header.
+     * @throws IOException
+    */
+    public long writeDirect(byte[] bytes) throws IOException;
+    
+    /**
+     * Write a long into the underlying sub system.
+     * @param value long to be written
+     * @throws IOException
+     */
+    public void writeLong(long value) throws IOException;
+      
+    /**
+     * Close the file which is being used for the write.
+     * @throws IOException
+     */
+    public void close() throws IOException;  
+    
+    /**
+     * Close the file after appending the passed in footer information.
+     * @param footer footer information.
+     * @param size size of the footer.
+     * @throws IOException
+     */
+    public void close(byte[] footer, int size) throws IOException;
+    
+    /**
+     * @return the name of the file.
+     */
+    public String getFileName();    
+    
+    /**
+     * @return the size of the file.
+     * @throws IOException
+     */
+    public long getFileSize() throws IOException;    
+}