You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:12:49 UTC

svn commit: r749205 [13/16] - in /incubator/cassandra/src/org/apache/cassandra: analytics/ cli/ concurrent/ config/ continuations/ cql/ cql/common/ cql/compiler/ cql/compiler/common/ cql/compiler/parse/ cql/compiler/sem/ cql/driver/ cql/execution/ dht/...

Added: incubator/cassandra/src/org/apache/cassandra/io/AIORandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/AIORandomAccessFile.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/AIORandomAccessFile.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/AIORandomAccessFile.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,789 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.CompletionHandler;
+import java.nio.file.OpenOption;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.ContinuationContext;
+import org.apache.cassandra.concurrent.ContinuationsExecutor;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.IContinuable;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.commons.javaflow.Continuation;
+import org.apache.log4j.Logger;
+
+/**
+ * A <code>AIORandomAccessFile</code> is like a
+ * <code>RandomAccessFile</code>, but it uses a private buffer so that most
+ * operations do not require a disk access.
+ * <P>
+ * 
+ * Note: The operations on this class are unmonitored. Also, the correct
+ * functioning of the <code>RandomAccessFile</code> methods that are not
+ * overridden here relies on the implementation of those methods in the
+ * superclass.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class AIORandomAccessFile extends RandomAccessFile
+{   
+    private final static Logger logger_ = Logger.getLogger(AIORandomAccessFile.class);
+    private final static ThreadLocal<Runnable> tls_ = new InheritableThreadLocal<Runnable>();    
+    static final int LogBuffSz_ = 16; // 64K buffer
+    public static final int BuffSz_ = (1 << LogBuffSz_);
+    static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
+    /* Used to lock the creation of the disk thread pool instance */
+    private static Lock createLock_ = new ReentrantLock(); 
+    private static ExecutorService diskIOPool_;          
+   
+    /**
+     * Submits a read request to the Kernel and is used
+     * only when running in Continuations mode. The kernel
+     * on read completion will resume the continuation passed
+     * in to complete the read request.
+     *  
+     * @author alakshman
+     *
+     */    
+    class AIOReader implements IContinuable
+    {              
+        /* the continuation that needs to be resumed on read completion */
+        private ContinuationContext continuationCtx_;                
+        
+        AIOReader(ContinuationContext continuationCtx)
+        {           
+            continuationCtx_ = continuationCtx;            
+        }
+        
+        public void run(Continuation c)
+        {                     
+            /* submit the read request */               
+            continuationCtx_.setContinuation(c);
+            ByteBuffer buffer = ByteBuffer.wrap( buffer_ );              
+            fileChannel_.read(buffer, diskPos_, continuationCtx_, new ReadCompletionHandler());
+        }
+    }  
+    
+    /**
+     * Read completion handler for AIO framework. The context
+     * that is passed in, is a Continuation that needs to be
+     * resumed on read completion.
+     * 
+     * @author alakshman
+     *
+     * @param <V> number of bytes read.
+     */    
+    class ReadCompletionHandler implements CompletionHandler<Integer, ContinuationContext>
+    {                   
+        public void cancelled(ContinuationContext attachment)
+        {
+        }
+        
+        public void completed(Integer result, ContinuationContext attachment)
+        {    
+            logger_.debug("Bytes read " + result);
+            if ( attachment != null )
+            {
+                Continuation c = attachment.getContinuation();     
+                attachment.result(result);
+                if ( c != null )
+                {                                           
+                    c = Continuation.continueWith(c, attachment);
+                    ContinuationsExecutor.doPostProcessing(c);
+                }
+            }            
+        }
+        
+        public void failed(Throwable th, ContinuationContext attachment)
+        {
+        }
+    }
+    
+    /*
+     * This implementation is based on the buffer implementation in Modula-3's
+     * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
+     */
+    private boolean dirty_; // true iff unflushed bytes exist
+    private boolean closed_; // true iff the file is closed
+    private long curr_; // current position in file
+    private long lo_, hi_; // bounds on characters in "buff"
+    private byte[] buffer_ = new byte[0]; // local buffer
+    private long maxHi_; // this.lo + this.buff.length
+    private boolean hitEOF_; // buffer contains last file block?
+    private long diskPos_; // disk position    
+    private AsynchronousFileChannel fileChannel_; // asynchronous file channel used for AIO.
+    private boolean bContinuations_; // indicates if used in continuations mode.    
+    
+    /*
+     * To describe the above fields, we introduce the following abstractions for
+     * the file "f":
+     * 
+     * len(f) the length of the file curr(f) the current position in the file
+     * c(f) the abstract contents of the file disk(f) the contents of f's
+     * backing disk file closed(f) true iff the file is closed
+     * 
+     * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
+     * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
+     * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
+     * operation has the effect of making "disk(f)" identical to "c(f)".
+     * 
+     * A file is said to be *valid* if the following conditions hold:
+     * 
+     * V1. The "closed" and "curr" fields are correct:
+     * 
+     * f.closed == closed(f) f.curr == curr(f)
+     * 
+     * V2. The current position is either contained in the buffer, or just past
+     * the buffer:
+     * 
+     * f.lo <= f.curr <= f.hi
+     * 
+     * V3. Any (possibly) unflushed characters are stored in "f.buff":
+     * 
+     * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
+     * 
+     * V4. For all characters not covered by V3, c(f) and disk(f) agree:
+     * 
+     * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
+     * disk(f)[i])
+     * 
+     * V5. "f.dirty" is true iff the buffer contains bytes that should be
+     * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
+     * 
+     * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
+     * 
+     * V6. this.maxHi == this.lo + this.buff.length
+     * 
+     * Note that "f.buff" can be "null" in a valid file, since the range of
+     * characters in V3 is empty when "f.lo == f.curr".
+     * 
+     * A file is said to be *ready* if the buffer contains the current position,
+     * i.e., when:
+     * 
+     * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
+     * 
+     * When a file is ready, reading or writing a single byte can be performed
+     * by reading or writing the in-memory buffer without performing a disk
+     * operation.
+     */
+    
+    /**
+     * Open a AIORandomAccessFile for r/w operations.
+     * @param file file to be opened.
+     */
+    public AIORandomAccessFile(File file) throws IOException
+    {
+        super(file, "rw");
+        this.init(file, 0, false);
+    }
+    
+    /**
+     * Open a AIORandomAccessFile for r/w operations.
+     * @param file file to be opened.
+     * @param bContinuations specify if continuations
+     *        support is required.
+     */
+    public AIORandomAccessFile(File file, boolean bContinuations) throws IOException
+    {
+        super(file, "rw");
+        this.init(file, 0, bContinuations);        
+    }
+    
+    /**
+     * Open a AIORandomAccessFile for r/w operations.
+     * @param file file to be opened
+     * @param size amount of data to be buffer as part 
+     *        of r/w operations
+     * @throws IOException
+     */
+    public AIORandomAccessFile(File file, int size) throws IOException
+    {
+        super(file, "rw");
+        init(file, size, false);
+    }
+    
+    /**
+     * Open a AIORandomAccessFile for r/w operations.
+     * @param file file to be opened
+     * @param size amount of data to be buffer as part 
+     *        of r/w operations
+     * @param bContinuations specify if continuations
+     *        support is required.
+     * @throws IOException
+     */
+    public AIORandomAccessFile(File file, int size, boolean bContinuations) throws IOException
+    {
+        super(file, "rw");
+        init(file, size, bContinuations);
+    }
+    
+    /**
+     * Open a AIORandomAccessFile for r/w operations.
+     * @param name of file to be opened.     
+     */
+    public AIORandomAccessFile(String name) throws IOException
+    {
+        super(name, "rw");
+        this.init(new File(name), 0, false);
+    }
+    
+    /**
+     * Open a AIORandomAccessFile for r/w operations.
+     * @param name of file to be opened.
+     * @param bContinuations specify if continuations
+     *        support is required.
+     */
+    public AIORandomAccessFile(String name, boolean bContinuations) throws IOException
+    {
+        super(name, "rw");
+        this.init(new File(name), 0, bContinuations);
+    }
+    
+    /**
+     * Open a AIORandomAccessFile for r/w operations.
+     * @param name of file to be opened.
+     * @param size buffering size to be used.
+     */
+    public AIORandomAccessFile(String name, int size) throws IOException
+    {
+        super(name, "rw");
+        this.init(new File(name), size, false);
+    }
+    
+    /**
+     * Open a AIORandomAccessFile for r/w operations.
+     * @param name of file to be opened.
+     * @param name of file to be opened.
+     * @param bContinuations specify if continuations
+     *        support is required.
+     */
+    public AIORandomAccessFile(String name, int size, boolean bContinuations) throws IOException
+    {
+        super(name, "rw");
+        this.init(new File(name), size, bContinuations);
+    }
+    
+    private void init(File file, int size, boolean bVal) throws IOException
+    {
+        bContinuations_ = bVal;
+        OpenOption[] openOptions = new OpenOption[]{StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ};
+        this.dirty_ = this.closed_ = false;
+        this.lo_ = this.curr_ = this.hi_ = 0;
+        this.buffer_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
+        this.maxHi_ = (long) BuffSz_;
+        this.hitEOF_ = false;
+        this.diskPos_ = 0L;        
+        /* set up the asynchronous file channel */
+        if ( diskIOPool_ == null )
+        {
+            createLock_.lock(); 
+            try
+            {
+                if ( diskIOPool_ == null )
+                {
+                    int maxThreads = DatabaseDescriptor.getThreadsPerPool();
+                    diskIOPool_ = new ContinuationsExecutor( maxThreads,
+                            maxThreads,
+                            Integer.MAX_VALUE,
+                            TimeUnit.SECONDS,
+                            new LinkedBlockingQueue<Runnable>(),
+                            new ThreadFactoryImpl("DISK-IO-POOL")
+                            ); 
+                }                                            
+            }
+            finally
+            {
+                createLock_.unlock();
+            }            
+        }
+        Set<OpenOption> set = new HashSet<OpenOption>( Arrays.asList(openOptions) );
+        fileChannel_ = AsynchronousFileChannel.open(file.toPath(), set, diskIOPool_);        
+    }
+    
+    public void close() throws IOException
+    {
+        this.onClose();
+        this.closed_ = true;     
+        fileChannel_.close();
+    }
+    
+    /**
+     * Flush any bytes in the file's buffer that have not yet been written to
+     * disk. If the file was created read-only, this method is a no-op.
+     */
+    public void flush() throws IOException
+    {
+        this.flushBuffer();
+    }
+    
+    /**
+     * Flush any dirty bytes in the buffer to disk. 
+    */
+    private void flushBuffer() throws IOException
+    {        
+        if (this.dirty_)
+        {            
+            int len = (int) (this.curr_ - this.lo_);            
+            doWrite(this.lo_, false);
+            this.diskPos_ = this.curr_;
+            this.dirty_ = false;
+        }                
+    }
+    
+    /**
+     * Invoked when close() is invoked and causes the flush
+     * of the last few bytes to block when the write is submitted.
+     * @throws IOException
+     */
+    private void onClose() throws IOException
+    {
+        if (this.dirty_)
+        {
+            int len = (int) (this.curr_ - this.lo_);            
+            doWrite(this.lo_, true);
+            this.diskPos_ = this.curr_;
+            this.dirty_ = false;
+        }  
+    }
+    
+    /**
+     * This method submits an I/O for write where the write happens at
+     * <i>position</i> within the file.
+     * 
+     * @param position to seek to within the file
+     * @param onClose indicates if this method was invoked on a close().
+     */
+    private void doWrite(long position, boolean onClose)
+    {          
+        ByteBuffer buffer = ByteBuffer.wrap(buffer_);  
+        int length = (int) (this.curr_ - this.lo_);
+        buffer.limit(length);         
+        Future<Integer> futurePtr = fileChannel_.write(buffer, position, null, new WriteCompletionHandler<Integer>());  
+        if ( onClose )
+        {
+            try
+            {
+                /* this will block but will execute only on a close() */
+                futurePtr.get();
+            }
+            catch (ExecutionException ex)
+            {
+                logger_.warn(LogUtil.throwableToString(ex));
+            }
+            catch (InterruptedException ex)
+            {
+                logger_.warn(LogUtil.throwableToString(ex));
+            }
+        }
+        buffer_ = new byte[buffer_.length];
+    }
+    
+    /**
+     * Read at most "this.buff.length" bytes into "this.buff", returning the
+     * number of bytes read. If the return result is less than
+     * "this.buff.length", then EOF was read.
+     */
+    private int fillBuffer() throws IOException
+    {        
+        int cnt = 0;        
+        ByteBuffer buffer = ByteBuffer.allocate(buffer_.length);                 
+        Future<Integer> futurePtr = fileChannel_.read(buffer, this.diskPos_, null, new ReadCompletionHandler());       
+                      
+        try
+        {
+            /* 
+             * This should block
+            */
+            cnt = futurePtr.get();
+        }
+        catch (ExecutionException ex)
+        {
+            logger_.warn(LogUtil.throwableToString(ex));
+        }
+        catch (InterruptedException ex)
+        {
+            logger_.warn(LogUtil.throwableToString(ex));
+        }
+        
+        if ( (cnt < 0) && ( this.hitEOF_ = (cnt < this.buffer_.length) ) ) 
+        {
+            // make sure buffer that wasn't read is initialized with -1
+            if ( cnt < 0 )
+                cnt = 0;            
+            Arrays.fill(buffer_, cnt, this.buffer_.length, (byte) 0xff);
+        }
+        else
+        {            
+            buffer_ = buffer.array();
+        }
+        this.diskPos_ += cnt;
+        return cnt;
+    } 
+ 
+    /**
+     * Read as much data as indicated by the size of the buffer.
+     * This method is only invoked in continuation mode.
+     */
+    private int fillBuffer2()
+    {    
+        ContinuationContext continuationCtx = (ContinuationContext)Continuation.getContext();        
+        IContinuable reader = new AIOReader( continuationCtx );
+        ContinuationsExecutor.putInTls(reader);
+        /* suspend the continuation */                 
+        Continuation.suspend();
+                                
+        continuationCtx = (ContinuationContext)Continuation.getContext();                        
+        int cnt = (Integer)continuationCtx.result();
+        
+        if ( (cnt < 0) && ( this.hitEOF_ = (cnt < this.buffer_.length) ) ) 
+        {
+            // make sure buffer that wasn't read is initialized with -1
+            if ( cnt < 0 )
+                cnt = 0;            
+            Arrays.fill(buffer_, cnt, this.buffer_.length, (byte) 0xff);
+        }           
+        this.diskPos_ += cnt;
+        return cnt;
+    }
+    
+    /**
+     * This method positions <code>this.curr</code> at position
+     * <code>pos</code>. If <code>pos</code> does not fall in the current
+     * buffer, it flushes the current buffer and loads the correct one.
+     * <p>
+     * 
+     * On exit from this routine <code>this.curr == this.hi</code> iff
+     * <code>pos</code> is at or past the end-of-file, which can only happen
+     * if the file was opened in read-only mode.
+     */
+    public void seek(long pos) throws IOException
+    {        
+        if (pos >= this.hi_ || pos < this.lo_)
+        {
+            // seeking outside of current buffer -- flush and read
+            this.flushBuffer();
+            this.lo_ = pos & BuffMask_; // start at BuffSz boundary
+            this.maxHi_ = this.lo_ + (long) this.buffer_.length;
+            if (this.diskPos_ != this.lo_)
+            {
+                this.diskPos_ = this.lo_;
+            }
+            
+            int n = 0;
+            /* Perform the read operations in continuation style */
+            if ( bContinuations_ )
+            {                   
+                n = fillBuffer2();
+            }
+            else
+            {
+                n = fillBuffer();
+            }
+            this.hi_ = this.lo_ + (long) n;
+        }
+        else
+        {
+            // seeking inside current buffer -- no read required
+            if (pos < this.curr_)
+            {
+                // if seeking backwards, we must flush to maintain V4
+                this.flushBuffer();
+            }
+        }
+        this.curr_ = pos;
+    }
+
+    
+    public long getFilePointer()
+    {
+        return this.curr_;
+    }
+    
+    public long length() throws IOException
+    {
+        return Math.max(this.curr_, super.length());
+    }
+    
+    public int read() throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+            
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        byte res = this.buffer_[(int) (this.curr_ - this.lo_)];
+        this.curr_++;
+        return ((int) res) & 0xFF; // convert byte -> int
+    }
+    
+    public int read(byte[] b) throws IOException
+    {
+        return this.read(b, 0, b.length);
+    }
+    
+    public int read(byte[] b, int off, int len) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+            
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(this.buffer_, buffOff, b, off, len);
+        this.curr_ += len;
+        return len;
+    }
+    
+    public void write(int b) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_)
+            {
+                // at EOF -- bump "hi"
+                this.hi_++;
+            }
+            else
+            {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_)
+                {
+                    // appending to EOF -- bump "hi"
+                    this.hi_++;
+                }
+            }
+        }
+        this.buffer_[(int) (this.curr_ - this.lo_)] = (byte) b;
+        this.curr_++;
+        this.dirty_ = true;
+    }
+    
+    public void write(byte[] b) throws IOException
+    {
+        this.write(b, 0, b.length);
+    }
+    
+    public void write(byte[] b, int off, int len) throws IOException
+    {
+        while (len > 0)
+        {
+            int n = this.writeAtMost(b, off, len);
+            off += n;
+            len -= n;
+        }
+        this.dirty_ = true;
+    }
+    
+    /*
+     * Write at most "len" bytes to "b" starting at position "off", and return
+     * the number of bytes written.
+     */
+    private int writeAtMost(byte[] b, int off, int len) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_)
+            {
+                // at EOF -- bump "hi"
+                this.hi_ = this.maxHi_;
+            }
+            else
+            {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_)
+                {
+                    // appending to EOF -- bump "hi"
+                    this.hi_ = this.maxHi_;
+                }
+            }
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(b, off, this.buffer_, buffOff, len);
+        this.curr_ += len;
+        return len;
+    }
+    
+    public static void main(String[] args) throws Throwable
+    {  
+        /*
+        int i = 0;
+        try
+        {
+            RandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat"), 64*1024);        
+            aRaf2.seek(0L);
+            while ( i < 10000 )
+            {
+                aRaf2.writeInt(32);
+                aRaf2.writeUTF("Avinash Lakshman");
+                ++i;
+            }
+            aRaf2.close();
+        }
+        catch( IOException ex )
+        {
+            ex.printStackTrace();
+        }
+        */
+        /*
+        int j = 0;
+        try
+        {
+            RandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat") );                    
+            while ( j < 10 )
+            {
+                System.out.println( aRaf2.readInt() );
+                System.out.println( aRaf2.readUTF() );
+                ++j;
+            }
+            aRaf2.close();
+        }
+        catch( IOException ex )
+        {
+            ex.printStackTrace();
+        }
+        */
+                
+        ExecutorService es = new ContinuationsExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() );
+        es.execute(new ReadImpl());               
+    }
+}
+
+class ReadImpl implements Runnable
+{
+    public void run()
+    {
+        int i = 0;
+        try
+        {
+            System.out.println("About to start the whole thing ...");
+            AIORandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat"), true );  
+            System.out.println("About to seek ...");
+            
+            //aRaf2.seek(0L);                        
+            System.out.println( aRaf2.readInt() );
+            System.out.println( aRaf2.readUTF() );
+            
+            System.out.println("About to seek a second time ...");
+            aRaf2.seek(66000L);
+            System.out.println( aRaf2.readInt() );
+            System.out.println( aRaf2.readUTF() );
+            
+            aRaf2.close();
+        }
+        catch( IOException ex )
+        {
+            ex.printStackTrace();
+        }        
+    }
+}
+
+class WriteImpl implements Runnable
+{
+    public void run()
+    {
+        int i = 0;
+        try
+        {
+            AIORandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat"));                    
+            while ( i < 10000 )
+            {
+                aRaf2.writeInt(32);
+                aRaf2.writeUTF("Avinash Lakshman thinks John McCain is an idiot");
+                ++i;
+            }
+            aRaf2.close();
+        }
+        catch( IOException ex )
+        {
+            ex.printStackTrace();
+        }        
+    }
+}
+
+/**
+ * Write completion handler for AIO framework. The context
+ * that is passed in, is a Continuation that needs to be
+ * resumed on write completion. For now the continuation is
+ * not used at all.
+ * 
+ * @author alakshman
+ *
+ * @param <V> number of bytes written.
+ */
+class WriteCompletionHandler<V> implements CompletionHandler<V, Continuation>
+{
+    private final static Logger logger_ = Logger.getLogger(WriteCompletionHandler.class);
+    
+    public void cancelled(Continuation attachment)
+    {
+    }
+    
+    public void completed(V result, Continuation attachment)
+    {               
+        logger_.debug("Bytes written " + result);
+        while ( attachment != null )
+        {
+            attachment = Continuation.continueWith(attachment);
+        }
+    }
+    
+    public void failed(Throwable th, Continuation attachment)
+    {
+    }
+}
+
+

Added: incubator/cassandra/src/org/apache/cassandra/io/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/BufferedRandomAccessFile.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/BufferedRandomAccessFile.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/BufferedRandomAccessFile.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.util.Arrays;
+
+import org.apache.log4j.Logger;
+
+/**
+ * A <code>BufferedRandomAccessFile</code> is like a
+ * <code>RandomAccessFile</code>, but it uses a private buffer so that most
+ * operations do not require a disk access.
+ * <P>
+ * 
+ * Note: The operations on this class are unmonitored. Also, the correct
+ * functioning of the <code>RandomAccessFile</code> methods that are not
+ * overridden here relies on the implementation of those methods in the
+ * superclass.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class BufferedRandomAccessFile extends RandomAccessFile
+{
+    private static final Logger logger_ = Logger.getLogger(BufferedRandomAccessFile.class);
+    static final int LogBuffSz_ = 16; // 64K buffer
+    public static final int BuffSz_ = (1 << LogBuffSz_);
+    static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
+    
+    /*
+     * This implementation is based on the buffer implementation in Modula-3's
+     * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
+     */
+    private boolean dirty_; // true iff unflushed bytes exist
+    private boolean closed_; // true iff the file is closed
+    private long curr_; // current position in file
+    private long lo_, hi_; // bounds on characters in "buff"
+    private byte[] buff_; // local buffer
+    private long maxHi_; // this.lo + this.buff.length
+    private boolean hitEOF_; // buffer contains last file block?
+    private long diskPos_; // disk position
+    
+    /*
+     * To describe the above fields, we introduce the following abstractions for
+     * the file "f":
+     * 
+     * len(f) the length of the file curr(f) the current position in the file
+     * c(f) the abstract contents of the file disk(f) the contents of f's
+     * backing disk file closed(f) true iff the file is closed
+     * 
+     * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
+     * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
+     * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
+     * operation has the effect of making "disk(f)" identical to "c(f)".
+     * 
+     * A file is said to be *valid* if the following conditions hold:
+     * 
+     * V1. The "closed" and "curr" fields are correct:
+     * 
+     * f.closed == closed(f) f.curr == curr(f)
+     * 
+     * V2. The current position is either contained in the buffer, or just past
+     * the buffer:
+     * 
+     * f.lo <= f.curr <= f.hi
+     * 
+     * V3. Any (possibly) unflushed characters are stored in "f.buff":
+     * 
+     * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
+     * 
+     * V4. For all characters not covered by V3, c(f) and disk(f) agree:
+     * 
+     * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
+     * disk(f)[i])
+     * 
+     * V5. "f.dirty" is true iff the buffer contains bytes that should be
+     * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
+     * 
+     * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
+     * 
+     * V6. this.maxHi == this.lo + this.buff.length
+     * 
+     * Note that "f.buff" can be "null" in a valid file, since the range of
+     * characters in V3 is empty when "f.lo == f.curr".
+     * 
+     * A file is said to be *ready* if the buffer contains the current position,
+     * i.e., when:
+     * 
+     * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
+     * 
+     * When a file is ready, reading or writing a single byte can be performed
+     * by reading or writing the in-memory buffer without performing a disk
+     * operation.
+     */
+    
+    /**
+     * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code>
+     * in mode <code>mode</code>, which should be "r" for reading only, or
+     * "rw" for reading and writing.
+     */
+    public BufferedRandomAccessFile(File file, String mode) throws IOException
+    {
+        super(file, mode);
+        this.init(0);
+    }
+    
+    public BufferedRandomAccessFile(File file, String mode, int size) throws IOException
+    {
+        super(file, mode);
+        this.init(size);
+    }
+    
+    /**
+     * Open a new <code>BufferedRandomAccessFile</code> on the file named
+     * <code>name</code> in mode <code>mode</code>, which should be "r" for
+     * reading only, or "rw" for reading and writing.
+     */
+    public BufferedRandomAccessFile(String name, String mode) throws IOException
+    {
+        super(name, mode);
+        this.init(0);
+    }
+    
+    public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException
+    {
+        super(name, mode);
+        this.init(size);
+    }
+    
+    private void init(int size)
+    {
+        this.dirty_ = this.closed_ = false;
+        this.lo_ = this.curr_ = this.hi_ = 0;
+        this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
+        this.maxHi_ = (long) BuffSz_;
+        this.hitEOF_ = false;
+        this.diskPos_ = 0L;
+    }
+    
+    public void close() throws IOException
+    {
+        this.flush();
+        this.closed_ = true;
+        super.close();
+    }
+    
+    /**
+     * Flush any bytes in the file's buffer that have not yet been written to
+     * disk. If the file was created read-only, this method is a no-op.
+     */
+    public void flush() throws IOException
+    {
+        this.flushBuffer();
+    }
+    
+    /* Flush any dirty bytes in the buffer to disk. */
+    private void flushBuffer() throws IOException
+    {
+        if (this.dirty_)
+        {
+            if (this.diskPos_ != this.lo_)
+                super.seek(this.lo_);
+            int len = (int) (this.curr_ - this.lo_);
+            super.write(this.buff_, 0, len);
+            this.diskPos_ = this.curr_;
+            this.dirty_ = false;
+        }
+    }
+    
+    /*
+     * Read at most "this.buff.length" bytes into "this.buff", returning the
+     * number of bytes read. If the return result is less than
+     * "this.buff.length", then EOF was read.
+     */
+    private int fillBuffer() throws IOException
+    {
+        int cnt = 0;
+        int rem = this.buff_.length;
+        while (rem > 0)
+        {
+            int n = super.read(this.buff_, cnt, rem);
+            if (n < 0)
+                break;
+            cnt += n;
+            rem -= n;
+        }
+        if ( (cnt < 0) && (this.hitEOF_ = (cnt < this.buff_.length)) )
+        {
+            // make sure buffer that wasn't read is initialized with -1
+            Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);
+        }
+        this.diskPos_ += cnt;
+        return cnt;
+    }
+    
+    /*
+     * This method positions <code>this.curr</code> at position <code>pos</code>.
+     * If <code>pos</code> does not fall in the current buffer, it flushes the
+     * current buffer and loads the correct one.<p>
+     * 
+     * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code>
+     * is at or past the end-of-file, which can only happen if the file was
+     * opened in read-only mode.
+     */
+    public void seek(long pos) throws IOException
+    {
+        if (pos >= this.hi_ || pos < this.lo_)
+        {
+            // seeking outside of current buffer -- flush and read
+            this.flushBuffer();
+            this.lo_ = pos & BuffMask_; // start at BuffSz boundary
+            this.maxHi_ = this.lo_ + (long) this.buff_.length;
+            if (this.diskPos_ != this.lo_)
+            {
+                super.seek(this.lo_);
+                this.diskPos_ = this.lo_;
+            }
+            int n = this.fillBuffer();
+            this.hi_ = this.lo_ + (long) n;
+        }
+        else
+        {
+            // seeking inside current buffer -- no read required
+            if (pos < this.curr_)
+            {
+                // if seeking backwards, we must flush to maintain V4
+                this.flushBuffer();
+            }
+        }
+        this.curr_ = pos;
+    }
+    
+    public long getFilePointer()
+    {
+        return this.curr_;
+    }
+    
+    public long length() throws IOException
+    {
+        return Math.max(this.curr_, super.length());
+    }
+    
+    public int read() throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+            
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        byte res = this.buff_[(int) (this.curr_ - this.lo_)];
+        this.curr_++;
+        return ((int) res) & 0xFF; // convert byte -> int
+    }
+    
+    public int read(byte[] b) throws IOException
+    {
+        return this.read(b, 0, b.length);
+    }
+    
+    public int read(byte[] b, int off, int len) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+            
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(this.buff_, buffOff, b, off, len);
+        this.curr_ += len;
+        return len;
+    }
+    
+    public void write(int b) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_)
+            {
+                // at EOF -- bump "hi"
+                this.hi_++;
+            }
+            else
+            {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_)
+                {
+                    // appending to EOF -- bump "hi"
+                    this.hi_++;
+                }
+            }
+        }
+        this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
+        this.curr_++;
+        this.dirty_ = true;
+    }
+    
+    public void write(byte[] b) throws IOException
+    {
+        this.write(b, 0, b.length);
+    }
+    
+    public void write(byte[] b, int off, int len) throws IOException
+    {
+        while (len > 0)
+        {
+            int n = this.writeAtMost(b, off, len);
+            off += n;
+            len -= n;
+        }
+        this.dirty_ = true;
+    }
+    
+    /*
+     * Write at most "len" bytes to "b" starting at position "off", and return
+     * the number of bytes written.
+     */
+    private int writeAtMost(byte[] b, int off, int len) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_)
+            {
+                // at EOF -- bump "hi"
+                this.hi_ = this.maxHi_;
+            }
+            else
+            {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_)
+                {
+                    // appending to EOF -- bump "hi"
+                    this.hi_ = this.maxHi_;
+                }
+            }
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(b, off, this.buff_, buffOff, len);
+        this.curr_ += len;
+        return len;
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/io/ChecksumManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/ChecksumManager.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/ChecksumManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/ChecksumManager.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Method;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.zip.Adler32;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import bak.pcj.map.AbstractLongKeyLongMap;
+import bak.pcj.map.LongKeyLongChainedHashMap;
+
+/**
+ * This class manages the persistence of checksums and keeps
+ * them in memory. It maintains a mapping of data files on
+ * disk to their corresponding checksum files. It is also
+ * loads the checksums in memory on start up.
+ * 
+ * @author alakshman
+ *
+ */
+class ChecksumManager
+{    
+    private static Logger logger_ = Logger.getLogger(ChecksumManager.class);
+    /* Keeps a mapping of checksum manager instances to data file */
+    private static Map<String, ChecksumManager> chksumMgrs_ = new HashMap<String, ChecksumManager>();
+    private static Lock lock_ = new ReentrantLock();
+    private static final String checksumPrefix_ = "Checksum-";
+    private static final int bufferSize_ = 8*1024*1024;
+    private static final long chunkMask_ = 0x00000000FFFFFFFFL;
+    private static final long fileIdMask_ = 0x7FFFFFFF00000000L;
+    /* Map where checksums are cached. */
+    private static AbstractLongKeyLongMap chksums_ = new LongKeyLongChainedHashMap();
+
+    public static ChecksumManager instance(String dataFile) throws IOException
+    {
+        ChecksumManager chksumMgr = chksumMgrs_.get(dataFile);
+        if ( chksumMgr == null )
+        {
+            lock_.lock();
+            try
+            {
+                if ( chksumMgr == null )
+                {
+                    chksumMgr = new ChecksumManager(dataFile);
+                    chksumMgrs_.put(dataFile, chksumMgr);
+                }
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        return chksumMgr;
+    }
+    
+    /* TODO: Debug only */
+    public static ChecksumManager instance(String dataFile, String chkSumFile) throws IOException
+    {
+        ChecksumManager chksumMgr = chksumMgrs_.get(dataFile);
+        if ( chksumMgr == null )
+        {
+            lock_.lock();
+            try
+            {
+                if ( chksumMgr == null )
+                {
+                    chksumMgr = new ChecksumManager(dataFile, chkSumFile);
+                    chksumMgrs_.put(dataFile, chksumMgr);
+                }
+            }
+            finally
+            {
+                lock_.unlock();
+            }
+        }
+        return chksumMgr;
+    }
+    
+    
+    /**
+     * On start read all the check sum files on disk and
+     * pull them into memory.
+     * @throws IOException
+     */
+    public static void onStart() throws IOException
+    {
+        String[] directories = DatabaseDescriptor.getAllDataFileLocations(); 
+        List<File> allFiles = new ArrayList<File>();
+        for ( String directory : directories )
+        {
+            File file = new File(directory);
+            File[] files = file.listFiles();
+            for ( File f : files )
+            {
+                if ( f.getName().contains(ChecksumManager.checksumPrefix_) )
+                {
+                    allFiles.add(f);
+                }
+            }
+        }
+        
+        for ( File file : allFiles )
+        {                           
+            int fId = SequenceFile.getFileId(file.getName());
+            ChecksumReader chksumRdr = new ChecksumReader(file.getAbsolutePath(), 0L, file.length());
+                        
+            int chunk = 0;
+            while ( !chksumRdr.isEOF() )
+            {
+                long value = chksumRdr.readLong();
+                long key = ChecksumManager.key(fId, ++chunk);
+                chksums_.put(key, value);
+            }
+        }
+    }
+    
+    /**
+     * On delete of this dataFile remove the checksums associated with
+     * this file from memory, remove the check sum manager instance.
+     * 
+     * @param dataFile data file that is being deleted.
+     * @throws IOException
+     */
+    public static void onFileDelete(String dataFile) throws IOException
+    {
+        File f = new File(dataFile);
+        long size = f.length();
+        int fileId = SequenceFile.getFileId(f.getName());
+        int chunks = (int)(size >> 16L);
+        
+        for ( int i = 0; i < chunks; ++i )
+        {
+            long key = ChecksumManager.key(fileId, i);
+            chksums_.remove(key);
+        }
+        
+        /* remove the check sum manager instance */
+        chksumMgrs_.remove(dataFile);
+        String chksumFile = f.getParent() + System.getProperty("file.separator") + checksumPrefix_ + fileId + ".db";
+        FileUtils.delete(chksumFile);
+    }
+    
+    private static long key(int fileId, int chunkId)
+    {
+        long key = 0;
+        key |= fileId;
+        key <<= 32;
+        key |= chunkId;
+        return key;
+    }
+    
+    private RandomAccessFile raf_;
+    private Adler32 adler_ = new Adler32();
+    
+    ChecksumManager(String dataFile) throws IOException
+    {
+        File file = new File(dataFile);
+        String directory = file.getParent();
+        String f = file.getName();
+        short fId = SequenceFile.getFileId(f);
+        String chkSumFile = directory + System.getProperty("file.separator") + checksumPrefix_ + fId + ".db";
+        raf_ = new RandomAccessFile(chkSumFile, "rw");
+    }
+    
+    /* TODO: Remove later. */
+    ChecksumManager(String dataFile, String chkSumFile) throws IOException
+    {
+        File file = new File(dataFile);
+        String directory = file.getParent();
+        String f = file.getName();
+        short fId = SequenceFile.getFileId(f);        
+        raf_ = new RandomAccessFile(chkSumFile, "rw");
+        
+        file = new File(chkSumFile);        
+        ChecksumReader chksumRdr = new ChecksumReader(file.getAbsolutePath(), 0L, file.length());
+                    
+        int chunk = 0;
+        while ( !chksumRdr.isEOF() )
+        {
+            long value = chksumRdr.readLong();
+            long key = ChecksumManager.key(fId, ++chunk);
+            chksums_.put(key, value);
+        }
+    }
+    
+    /**
+     * Log the checksum for the the specified file and chunk
+     * within the file.
+     * @param fileId id associated with the file
+     * @param chunkId chunk within the file.
+     * @param buffer for which the checksum needs to be calculated.
+     * @throws IOException
+     */
+    void logChecksum(int fileId, int chunkId, byte[] buffer)
+    {
+        logChecksum(fileId, chunkId, buffer, 0, buffer.length);
+    }
+    
+    /**
+     * Log the checksum for the the specified file and chunk
+     * within the file.
+     * @param fileId id associated with the file
+     * @param chunkId chunk within the file.
+     * @param buffer for which the checksum needs to be calculated.
+     * @param startoffset offset to start within the buffer
+     * @param length size of the checksum buffer.
+     * @throws IOException
+     */
+    void logChecksum(int fileId, int chunkId, byte[] buffer, int startOffset, int length)
+    {
+        try
+        {            
+            adler_.update(buffer, startOffset, length);
+            long chksum = adler_.getValue();
+            adler_.reset();
+            /* log checksums to disk */
+            raf_.writeLong(chksum);
+            /* add the chksum to memory */
+            long key = ChecksumManager.key(fileId, chunkId);
+            chksums_.put(key, chksum);
+        }
+        catch ( IOException ex )
+        {
+            logger_.warn( LogUtil.throwableToString(ex) );
+        }
+    }
+    
+    /**
+     * Validate checksums for the data in the buffer.
+     * @file name of the file from which data is being
+     *       read.
+     * @chunkId chunkId
+     * @param buffer with data for which checksum needs to be 
+     *        verified.
+     * @throws IOException
+     */
+    void validateChecksum(String file, int chunkId, byte[] buffer) throws IOException
+    {                
+        validateChecksum(file, chunkId, buffer, 0, buffer.length);
+    }
+    
+    /**
+     * Validate checksums for the data in the buffer for the region
+     * that is encapsulated in the section object
+     * @file name of the file from which data is being
+     *       read.
+     * @chunkId chunkId     
+     * @param buffer with data for which checksum needs to be 
+     *        verified.
+     * @param startOffset within the buffer
+     * @param length of the data whose checksum needs to be verified.
+     * @throws IOException
+     */
+    void validateChecksum(String file, int chunkId, byte[] buffer, int startOffset, int length) throws IOException
+    {            
+        int fId = SequenceFile.getFileId(file);
+        long key = ChecksumManager.key(fId, chunkId);
+        adler_.update(buffer, startOffset, length);
+        long currentChksum = adler_.getValue();
+        adler_.reset();
+        long oldChksum = chksums_.get(key);
+        if ( currentChksum != oldChksum )
+        {                                   
+            throw new IOException("Checksums do not match in file " + file + " for chunk " + chunkId + ".");
+        }        
+    }
+    
+    
+    /**
+     * Get the checksum for the specified file's chunk
+     * @param fileId id associated with the file.
+     * @param chunkId chunk within the file.
+     * @return associated checksum for the chunk
+     */
+    long getChecksum(int fileId, int chunkId)
+    {        
+        long key = ChecksumManager.key(fileId, chunkId);
+        return chksums_.get(key);
+    }
+    
+    public static void main(String[] args) throws Throwable
+    {
+        ChecksumReader rdr = new ChecksumReader("C:\\Engagements\\Cassandra\\Checksum-1.db");
+        while ( !rdr.isEOF() )
+        {
+            System.out.println(rdr.readLong());
+        }
+        rdr.close();
+    }
+}
+
+/**
+ * ChecksumReader is used to memory map the checksum files and
+ * load the data into memory.
+ * 
+ * @author alakshman
+ *
+ */
+class ChecksumReader 
+{
+    private static Logger logger_ = Logger.getLogger(ChecksumReader.class);
+    private String filename_;
+    private MappedByteBuffer buffer_;
+
+    ChecksumReader(String filename) throws IOException
+    {
+        filename_ = filename;
+        File f = new File(filename);
+        map(0, f.length());
+    }
+    
+    ChecksumReader(String filename, long start, long end) throws IOException
+    {        
+        filename_ = filename;
+        map(start, end);
+    }
+
+    public void map() throws IOException
+    {
+        RandomAccessFile file = new RandomAccessFile(filename_, "rw");
+        try
+        {
+            buffer_ = file.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, file.length() );
+            buffer_.load();
+        }
+        finally
+        {
+            file.close();
+        }
+    }
+
+    public void map(long start, long end) throws IOException
+    {
+        if ( start < 0 || end < 0 || end < start )
+            throw new IllegalArgumentException("Invalid values for start and end.");
+
+        RandomAccessFile file = new RandomAccessFile(filename_, "rw");
+        try
+        {
+            if ( end == 0 )
+                end = file.length();
+            buffer_ = file.getChannel().map(FileChannel.MapMode.READ_ONLY, start, end);
+            buffer_.load();
+        }
+        finally
+        {
+            file.close();
+        }
+    }
+
+    void unmap(final Object buffer)
+    {
+        AccessController.doPrivileged( new PrivilegedAction<MappedByteBuffer>()
+                {
+            public MappedByteBuffer run()
+            {
+                try
+                {
+                    Method getCleanerMethod = buffer.getClass().getMethod("cleaner", new Class[0]);
+                    getCleanerMethod.setAccessible(true);
+                    sun.misc.Cleaner cleaner = (sun.misc.Cleaner)getCleanerMethod.invoke(buffer,new Object[0]);
+                    cleaner.clean();
+                }
+                catch(Throwable e)
+                {
+                    logger_.debug( LogUtil.throwableToString(e) );
+                }
+                return null;
+            }
+                });
+    }
+    
+    public long readLong() throws IOException
+    {
+        return buffer_.getLong();
+    }
+    
+    public boolean isEOF()
+    {
+        return ( buffer_.remaining() == 0 );
+    }
+
+    
+    public void close() throws IOException
+    {
+        unmap(buffer_);
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,456 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+
+/**
+ * A <code>ChecksumRandomAccessFile</code> is like a
+ * <code>RandomAccessFile</code>, but it uses a private buffer so that most
+ * operations do not require a disk access.
+ * <P>
+ * 
+ * Note: The operations on this class are unmonitored. Also, the correct
+ * functioning of the <code>RandomAccessFile</code> methods that are not
+ * overridden here relies on the implementation of those methods in the
+ * superclass.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class ChecksumRandomAccessFile extends RandomAccessFile
+{ 
+    private static enum ChecksumOperations
+    {
+        LOG,
+        VERIFY
+    }
+    
+    static final int LogBuffSz_ = 16; // 64K buffer
+    private static final int checksumSz_ = (1 << LogBuffSz_); // 64K
+    public static final int BuffSz_ = (1 << LogBuffSz_);
+    static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
+    
+    /*
+     * This implementation is based on the buffer implementation in Modula-3's
+     * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
+     */
+    private boolean dirty_; // true iff unflushed bytes exist
+    private boolean closed_; // true iff the file is closed
+    private long curr_; // current position in file
+    private long lo_, hi_; // bounds on characters in "buff"
+    private byte[] buff_; // local buffer
+    private long maxHi_; // this.lo + this.buff.length
+    private boolean hitEOF_; // buffer contains last file block?
+    private long diskPos_; // disk position
+    private String filename_; // file name
+    
+    /*
+     * To describe the above fields, we introduce the following abstractions for
+     * the file "f":
+     * 
+     * len(f) the length of the file curr(f) the current position in the file
+     * c(f) the abstract contents of the file disk(f) the contents of f's
+     * backing disk file closed(f) true iff the file is closed
+     * 
+     * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
+     * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
+     * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
+     * operation has the effect of making "disk(f)" identical to "c(f)".
+     * 
+     * A file is said to be *valid* if the following conditions hold:
+     * 
+     * V1. The "closed" and "curr" fields are correct:
+     * 
+     * f.closed == closed(f) f.curr == curr(f)
+     * 
+     * V2. The current position is either contained in the buffer, or just past
+     * the buffer:
+     * 
+     * f.lo <= f.curr <= f.hi
+     * 
+     * V3. Any (possibly) unflushed characters are stored in "f.buff":
+     * 
+     * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
+     * 
+     * V4. For all characters not covered by V3, c(f) and disk(f) agree:
+     * 
+     * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
+     * disk(f)[i])
+     * 
+     * V5. "f.dirty" is true iff the buffer contains bytes that should be
+     * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
+     * 
+     * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
+     * 
+     * V6. this.maxHi == this.lo + this.buff.length
+     * 
+     * Note that "f.buff" can be "null" in a valid file, since the range of
+     * characters in V3 is empty when "f.lo == f.curr".
+     * 
+     * A file is said to be *ready* if the buffer contains the current position,
+     * i.e., when:
+     * 
+     * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
+     * 
+     * When a file is ready, reading or writing a single byte can be performed
+     * by reading or writing the in-memory buffer without performing a disk
+     * operation.
+     */
+    
+    /**
+     * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code>
+     * in mode <code>mode</code>, which should be "r" for reading only, or
+     * "rw" for reading and writing.
+     */
+    public ChecksumRandomAccessFile(File file, String mode) throws IOException
+    {
+        super(file, mode);
+        this.init(file.getAbsolutePath(), 0);
+    }
+    
+    public ChecksumRandomAccessFile(File file, String mode, int size) throws IOException
+    {
+        super(file, mode);
+        this.init(file.getAbsolutePath(), size);
+    }
+    
+    /**
+     * Open a new <code>BufferedRandomAccessFile</code> on the file named
+     * <code>name</code> in mode <code>mode</code>, which should be "r" for
+     * reading only, or "rw" for reading and writing.
+     */
+    public ChecksumRandomAccessFile(String name, String mode) throws IOException
+    {
+        super(name, mode);
+        this.init(name, 0);
+    }
+    
+    public ChecksumRandomAccessFile(String name, String mode, int size) throws FileNotFoundException
+    {
+        super(name, mode);
+        this.init(name, size);
+    }
+    
+    private void init(String name, int size)
+    {
+        this.dirty_ = this.closed_ = false;
+        this.lo_ = this.curr_ = this.hi_ = 0;
+        this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
+        this.maxHi_ = (long) BuffSz_;
+        this.hitEOF_ = false;
+        this.diskPos_ = 0L;
+        this.filename_ = name;
+    }
+    
+    public void close() throws IOException
+    {
+        this.flush();
+        this.closed_ = true;
+        super.close();
+    }
+    
+    /**
+     * Flush any bytes in the file's buffer that have not yet been written to
+     * disk. If the file was created read-only, this method is a no-op.
+     */
+    public void flush() throws IOException
+    {
+        this.flushBuffer();
+    }
+    
+    private void doChecksumOperation(ChecksumOperations chksumOps) throws IOException
+    {        
+        int buffSz = buff_.length;
+        /*
+         * If the diskPos_ is at the buffer boundary then return 
+         * diskPos_ - 1 else return the actual diskPos_.
+        */        
+        long currentPosition = ( (diskPos_ % buffSz) == 0 ) ? diskPos_ - 1 : diskPos_;
+        /* Tells me which buffered chunk I am in. */
+        long chunk = (currentPosition / buffSz) + 1; 
+        /* Number of checksum chunks within a buffer */
+        int chksumChunks = buff_.length / checksumSz_;  
+        /* Position of the start of the previous buffer boundary */
+        long pos = (chunk == 0) ? 0 : (chunk - 1)*buffSz;
+        int startOffset = 0;
+        int chksumChunkId = (int)(chksumChunks*(chunk - 1) + 1);
+        do
+        {            
+            int fId = SequenceFile.getFileId(filename_);               
+            switch( chksumOps )
+            {
+                case LOG:                    
+                    ChecksumManager.instance(filename_).logChecksum(fId, chksumChunkId++, buff_, startOffset, checksumSz_);
+                    break;
+                case VERIFY:
+                    ChecksumManager.instance(filename_).validateChecksum(filename_, chksumChunkId++, buff_, startOffset, checksumSz_);
+                    break;
+            }
+            pos += checksumSz_;
+            startOffset += checksumSz_;
+        }
+        while ( pos < currentPosition );
+    }
+    
+    /**
+     * Flush any dirty bytes in the buffer to disk. 
+    */
+    private void flushBuffer() throws IOException
+    {
+        if (this.dirty_)
+        {
+            if (this.diskPos_ != this.lo_)
+                super.seek(this.lo_);
+            int len = (int) (this.curr_ - this.lo_);            
+            super.write(this.buff_, 0, len);
+            this.diskPos_ = this.curr_;
+            this.dirty_ = false;
+            /* checksum the data before we write to disk */
+            doChecksumOperation(ChecksumOperations.LOG);
+        }
+    }
+    
+    /**
+     * Read at most "this.buff.length" bytes into "this.buff", returning the
+     * number of bytes read. If the return result is less than
+     * "this.buff.length", then EOF was read.
+     */
+    private int fillBuffer() throws IOException
+    {
+        int cnt = 0;
+        int rem = this.buff_.length;
+        int n = -1;
+        while (rem > 0)
+        {
+            n = super.read(this.buff_, cnt, rem);
+            if (n < 0)
+                break;
+            cnt += n;
+            rem -= n;
+        }
+        if (this.hitEOF_ = (cnt < this.buff_.length))
+        {
+            // make sure buffer that wasn't read is initialized with -1
+            Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);             
+        }                
+        this.diskPos_ += cnt;
+        return cnt;
+    }
+    
+    /**
+     * This method positions <code>this.curr</code> at position <code>pos</code>.
+     * If <code>pos</code> does not fall in the current buffer, it flushes the
+     * current buffer and loads the correct one.<p>
+     * 
+     * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code>
+     * is at or past the end-of-file, which can only happen if the file was
+     * opened in read-only mode.
+     */
+    public void seek(long pos) throws IOException
+    {
+        if (pos >= this.hi_ || pos < this.lo_)
+        {
+            // seeking outside of current buffer -- flush and read
+            this.flushBuffer();
+            this.lo_ = pos & BuffMask_; // start at BuffSz boundary
+            this.maxHi_ = this.lo_ + (long) this.buff_.length;
+            if (this.diskPos_ != this.lo_)
+            {
+                super.seek(this.lo_);
+                this.diskPos_ = this.lo_;
+            }
+            int n = this.fillBuffer();
+            if ( n > 0 )
+            {
+                doChecksumOperation(ChecksumOperations.VERIFY);
+            }
+            this.hi_ = this.lo_ + (long) n;
+        }
+        else
+        {
+            // seeking inside current buffer -- no read required
+            if (pos < this.curr_)
+            {
+                // if seeking backwards, we must flush to maintain V4
+                this.flushBuffer();
+            }
+        }
+        this.curr_ = pos;
+    }
+    
+    public long getFilePointer()
+    {
+        return this.curr_;
+    }
+    
+    public long length() throws IOException
+    {
+        return Math.max(this.curr_, super.length());
+    }
+    
+    public int read() throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+            
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        byte res = this.buff_[(int) (this.curr_ - this.lo_)];
+        this.curr_++;
+        return ((int) res) & 0xFF; // convert byte -> int
+    }
+    
+    public int read(byte[] b) throws IOException
+    {
+        return this.read(b, 0, b.length);
+    }
+    
+    public int read(byte[] b, int off, int len) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            // test for EOF
+            // if (this.hi < this.maxHi) return -1;
+            if (this.hitEOF_)
+                return -1;
+            
+            // slow path -- read another buffer
+            this.seek(this.curr_);
+            if (this.curr_ == this.hi_)
+                return -1;
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(this.buff_, buffOff, b, off, len);
+        this.curr_ += len;
+        return len;
+    }
+    
+    public void write(int b) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_)
+            {
+                // at EOF -- bump "hi"
+                this.hi_++;
+            }
+            else
+            {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_)
+                {
+                    // appending to EOF -- bump "hi"
+                    this.hi_++;
+                }
+            }
+        }
+        this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
+        this.curr_++;
+        this.dirty_ = true;
+    }
+    
+    public void write(byte[] b) throws IOException
+    {
+        this.write(b, 0, b.length);
+    }
+    
+    public void write(byte[] b, int off, int len) throws IOException
+    {
+        while (len > 0)
+        {
+            int n = this.writeAtMost(b, off, len);
+            off += n;
+            len -= n;
+        }
+        this.dirty_ = true;
+    }
+    
+    /*
+     * Write at most "len" bytes to "b" starting at position "off", and return
+     * the number of bytes written.
+     */
+    private int writeAtMost(byte[] b, int off, int len) throws IOException
+    {
+        if (this.curr_ >= this.hi_)
+        {
+            if (this.hitEOF_ && this.hi_ < this.maxHi_)
+            {
+                // at EOF -- bump "hi"
+                this.hi_ = this.maxHi_;
+            }
+            else
+            {
+                // slow path -- write current buffer; read next one
+                this.seek(this.curr_);
+                if (this.curr_ == this.hi_)
+                {
+                    // appending to EOF -- bump "hi"
+                    this.hi_ = this.maxHi_;
+                }
+            }
+        }
+        len = Math.min(len, (int) (this.hi_ - this.curr_));
+        int buffOff = (int) (this.curr_ - this.lo_);
+        System.arraycopy(b, off, this.buff_, buffOff, len);
+        this.curr_ += len;
+        return len;
+    }
+    
+    public static void main(String[] args) throws Throwable
+    {
+        
+        RandomAccessFile raf = new ChecksumRandomAccessFile("C:\\Engagements\\Table-ColumnFamily-1-Data.dat", "rw");
+        byte[] bytes = new byte[32*1024];
+        Random random = new Random();
+        
+        for ( int i = 0; i < 16; ++i )
+        {
+            random.nextBytes(bytes);
+            raf.write(bytes);
+        }
+        raf.close();
+        
+        String file = "C:\\Engagements\\Checksum-1.db";
+        ChecksumManager.instance("C:\\Engagements\\Table-ColumnFamily-1-Data.dat", file);
+        
+        raf = new ChecksumRandomAccessFile("C:\\Engagements\\Table-ColumnFamily-1-Data.dat", "rw", 4*1024*1024);
+        bytes = new byte[32*1024];
+        
+        for ( int i = 0; i < 16; ++i )
+        {           
+            raf.readFully(bytes);
+        }
+        raf.close();
+       
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/io/Coordinate.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/Coordinate.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/Coordinate.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/Coordinate.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io;
+
+/**
+ * Section of a file that needs to be scanned
+ * is represented by this class.
+*/
+class Coordinate
+{
+    long start_;
+    long end_;
+    
+    Coordinate(long start, long end)
+    {
+        start_ = start;
+        end_ = end;
+    }
+}

Added: incubator/cassandra/src/org/apache/cassandra/io/DataInputBuffer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/DataInputBuffer.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/DataInputBuffer.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/DataInputBuffer.java Mon Mar  2 06:12:46 2009
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.cassandra.continuations.Suspendable;
+
+
+/**
+ * An implementation of the DataInputStream interface. This instance is completely thread 
+ * unsafe.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class DataInputBuffer extends DataInputStream
+{
+    /*
+     * This is a clone of the ByteArrayInputStream class w/o any
+     * method being synchronized.
+    */
+    public static class FastByteArrayInputStream extends InputStream
+    {             
+        /**
+         * An array of bytes that was provided by the creator of the stream.
+         * Elements <code>buf[0]</code> through <code>buf[count-1]</code> are
+         * the only bytes that can ever be read from the stream; element
+         * <code>buf[pos]</code> is the next byte to be read.
+         */
+        protected byte buf[];
+
+        /**
+         * The index of the next character to read from the input stream buffer.
+         * This value should always be nonnegative and not larger than the value of
+         * <code>count</code>. The next byte to be read from the input stream
+         * buffer will be <code>buf[pos]</code>.
+         */
+        protected int pos;
+
+        /**
+         * The currently marked position in the stream. ByteArrayInputStream objects
+         * are marked at position zero by default when constructed. They may be
+         * marked at another position within the buffer by the <code>mark()</code>
+         * method. The current buffer position is set to this point by the
+         * <code>reset()</code> method.
+         * <p>
+         * If no mark has been set, then the value of mark is the offset passed to
+         * the constructor (or 0 if the offset was not supplied).
+         * 
+         * @since JDK1.1
+         */
+        protected int mark = 0;
+
+        /**
+         * The index one greater than the last valid character in the input stream
+         * buffer. This value should always be nonnegative and not larger than the
+         * length of <code>buf</code>. It is one greater than the position of the
+         * last byte within <code>buf</code> that can ever be read from the input
+         * stream buffer.
+         */
+        protected int count;
+        
+        public FastByteArrayInputStream()
+        {
+            buf = new byte[0];
+        }
+
+        /**
+         * Creates a <code>ByteArrayInputStream</code> so that it uses
+         * <code>buf</code> as its buffer array. The buffer array is not copied.
+         * The initial value of <code>pos</code> is <code>0</code> and the
+         * initial value of <code>count</code> is the length of <code>buf</code>.
+         * 
+         * @param buf
+         *            the input buffer.
+         */
+        public FastByteArrayInputStream(byte buf[])
+        {
+            this.buf = buf;
+            this.pos = 0;
+            this.count = buf.length;
+        }
+
+        /**
+         * Creates <code>ByteArrayInputStream</code> that uses <code>buf</code>
+         * as its buffer array. The initial value of <code>pos</code> is
+         * <code>offset</code> and the initial value of <code>count</code> is
+         * the minimum of <code>offset+length</code> and <code>buf.length</code>.
+         * The buffer array is not copied. The buffer's mark is set to the specified
+         * offset.
+         * 
+         * @param buf
+         *            the input buffer.
+         * @param offset
+         *            the offset in the buffer of the first byte to read.
+         * @param length
+         *            the maximum number of bytes to read from the buffer.
+         */
+        public FastByteArrayInputStream(byte buf[], int offset, int length)
+        {
+            this.buf = buf;
+            this.pos = offset;
+            this.count = Math.min(offset + length, buf.length);
+            this.mark = offset;
+        }
+        
+        public final void setBytes(byte[] bytes)
+        {
+            buf = bytes;
+            pos = 0;
+            count = bytes.length;            
+        }
+
+        /**
+         * Reads the next byte of data from this input stream. The value byte is
+         * returned as an <code>int</code> in the range <code>0</code> to
+         * <code>255</code>. If no byte is available because the end of the
+         * stream has been reached, the value <code>-1</code> is returned.
+         * <p>
+         * This <code>read</code> method cannot block.
+         * 
+         * @return the next byte of data, or <code>-1</code> if the end of the
+         *         stream has been reached.
+         */
+        public final int read()
+        {                  
+            return (pos < count) ? ( buf[pos++] & 0xFF ) : -1;            
+        }
+
+        /**
+         * Reads up to <code>len</code> bytes of data into an array of bytes from
+         * this input stream. If <code>pos</code> equals <code>count</code>,
+         * then <code>-1</code> is returned to indicate end of file. Otherwise,
+         * the number <code>k</code> of bytes read is equal to the smaller of
+         * <code>len</code> and <code>count-pos</code>. If <code>k</code> is
+         * positive, then bytes <code>buf[pos]</code> through
+         * <code>buf[pos+k-1]</code> are copied into <code>b[off]</code> through
+         * <code>b[off+k-1]</code> in the manner performed by
+         * <code>System.arraycopy</code>. The value <code>k</code> is added
+         * into <code>pos</code> and <code>k</code> is returned.
+         * <p>
+         * This <code>read</code> method cannot block.
+         * 
+         * @param b
+         *            the buffer into which the data is read.
+         * @param off
+         *            the start offset in the destination array <code>b</code>
+         * @param len
+         *            the maximum number of bytes read.
+         * @return the total number of bytes read into the buffer, or
+         *         <code>-1</code> if there is no more data because the end of the
+         *         stream has been reached.
+         * @exception NullPointerException
+         *                If <code>b</code> is <code>null</code>.
+         * @exception IndexOutOfBoundsException
+         *                If <code>off</code> is negative, <code>len</code> is
+         *                negative, or <code>len</code> is greater than
+         *                <code>b.length - off</code>
+         */
+        public final int read(byte b[], int off, int len)
+        {
+            if (b == null)
+            {
+                throw new NullPointerException();
+            }
+            else if (off < 0 || len < 0 || len > b.length - off)
+            {
+                throw new IndexOutOfBoundsException();
+            }
+            if (pos >= count)
+            {
+                return -1;
+            }
+            if (pos + len > count)
+            {
+                len = count - pos;
+            }
+            if (len <= 0)
+            {
+                return 0;
+            }
+            System.arraycopy(buf, pos, b, off, len);
+            pos += len;
+            return len;
+        }
+
+        /**
+         * Skips <code>n</code> bytes of input from this input stream. Fewer bytes
+         * might be skipped if the end of the input stream is reached. The actual
+         * number <code>k</code> of bytes to be skipped is equal to the smaller of
+         * <code>n</code> and <code>count-pos</code>. The value <code>k</code>
+         * is added into <code>pos</code> and <code>k</code> is returned.
+         * 
+         * @param n
+         *            the number of bytes to be skipped.
+         * @return the actual number of bytes skipped.
+         */
+        public final long skip(long n)
+        {
+            if (pos + n > count)
+            {
+                n = count - pos;
+            }
+            if (n < 0)
+            {
+                return 0;
+            }
+            pos += n;
+            return n;
+        }
+
+        /**
+         * Returns the number of remaining bytes that can be read (or skipped over)
+         * from this input stream.
+         * <p>
+         * The value returned is <code>count&nbsp;- pos</code>, which is the
+         * number of bytes remaining to be read from the input buffer.
+         * 
+         * @return the number of remaining bytes that can be read (or skipped over)
+         *         from this input stream without blocking.
+         */
+        public final int available()
+        {
+            return count - pos;
+        }
+
+        /**
+         * Tests if this <code>InputStream</code> supports mark/reset. The
+         * <code>markSupported</code> method of <code>ByteArrayInputStream</code>
+         * always returns <code>true</code>.
+         * 
+         * @since JDK1.1
+         */
+        public final boolean markSupported()
+        {
+            return true;
+        }
+
+        /**
+         * Set the current marked position in the stream. ByteArrayInputStream
+         * objects are marked at position zero by default when constructed. They may
+         * be marked at another position within the buffer by this method.
+         * <p>
+         * If no mark has been set, then the value of the mark is the offset passed
+         * to the constructor (or 0 if the offset was not supplied).
+         * 
+         * <p>
+         * Note: The <code>readAheadLimit</code> for this class has no meaning.
+         * 
+         * @since JDK1.1
+         */
+        public final void mark(int readAheadLimit)
+        {
+            mark = pos;
+        }
+
+        /**
+         * Resets the buffer to the marked position. The marked position is 0 unless
+         * another position was marked or an offset was specified in the
+         * constructor.
+         */
+        public final void reset()
+        {
+            pos = mark;
+        }
+
+        /**
+         * Closing a <tt>ByteArrayInputStream</tt> has no effect. The methods in
+         * this class can be called after the stream has been closed without
+         * generating an <tt>IOException</tt>.
+         * <p>
+         */
+        public final void close() throws IOException
+        {
+        }
+    }
+    
+    private static class Buffer extends FastByteArrayInputStream
+    {        
+        public Buffer()
+        {
+            super(new byte[] {});
+        }
+
+        public void reset(byte[] input, int start, int length)
+        {
+            this.buf = input;
+            this.count = start + length;
+            this.mark = start;
+            this.pos = start;
+        }
+        
+        public int getPosition()
+        {
+            return pos;
+        }
+        
+        public void setPosition(int position)
+        {
+            pos = position;
+        }        
+
+        public int getLength()
+        {
+            return count;
+        }
+    }
+
+    private Buffer buffer_;
+
+    /** Constructs a new empty buffer. */
+    public DataInputBuffer()
+    {
+        this(new Buffer());
+    }
+
+    private DataInputBuffer(Buffer buffer)
+    {
+        super(buffer);
+        this.buffer_ = buffer;
+    }
+   
+    /** Resets the data that the buffer reads. */
+    public void reset(byte[] input, int length)
+    {
+        buffer_.reset(input, 0, length);
+    }
+
+    /** Resets the data that the buffer reads. */
+    public void reset(byte[] input, int start, int length)
+    {
+        buffer_.reset(input, start, length);
+    }
+
+    /** Returns the current position in the input. */
+    public int getPosition()
+    {
+        return buffer_.getPosition();
+    }
+    
+    /** Set the position within the input */
+    public void setPosition(int position)
+    {
+        buffer_.setPosition(position);
+    }
+
+    /** Returns the length of the input. */
+    public int getLength()
+    {
+        return buffer_.getLength();
+    }
+}