You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:12:49 UTC
svn commit: r749205 [13/16] - in
/incubator/cassandra/src/org/apache/cassandra: analytics/ cli/ concurrent/
config/ continuations/ cql/ cql/common/ cql/compiler/ cql/compiler/common/
cql/compiler/parse/ cql/compiler/sem/ cql/driver/ cql/execution/ dht/...
Added: incubator/cassandra/src/org/apache/cassandra/io/AIORandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/AIORandomAccessFile.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/AIORandomAccessFile.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/AIORandomAccessFile.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,789 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.CompletionHandler;
+import java.nio.file.OpenOption;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.ContinuationContext;
+import org.apache.cassandra.concurrent.ContinuationsExecutor;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.IContinuable;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.commons.javaflow.Continuation;
+import org.apache.log4j.Logger;
+
+/**
+ * A <code>AIORandomAccessFile</code> is like a
+ * <code>RandomAccessFile</code>, but it uses a private buffer so that most
+ * operations do not require a disk access.
+ * <P>
+ *
+ * Note: The operations on this class are unmonitored. Also, the correct
+ * functioning of the <code>RandomAccessFile</code> methods that are not
+ * overridden here relies on the implementation of those methods in the
+ * superclass.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class AIORandomAccessFile extends RandomAccessFile
+{
+ private final static Logger logger_ = Logger.getLogger(AIORandomAccessFile.class);
+ private final static ThreadLocal<Runnable> tls_ = new InheritableThreadLocal<Runnable>();
+ static final int LogBuffSz_ = 16; // 64K buffer
+ public static final int BuffSz_ = (1 << LogBuffSz_);
+ static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
+ /* Used to lock the creation of the disk thread pool instance */
+ private static Lock createLock_ = new ReentrantLock();
+ private static ExecutorService diskIOPool_;
+
+ /**
+ * Submits a read request to the Kernel and is used
+ * only when running in Continuations mode. The kernel
+ * on read completion will resume the continuation passed
+ * in to complete the read request.
+ *
+ * @author alakshman
+ *
+ */
+ class AIOReader implements IContinuable
+ {
+ /* the continuation that needs to be resumed on read completion */
+ private ContinuationContext continuationCtx_;
+
+ AIOReader(ContinuationContext continuationCtx)
+ {
+ continuationCtx_ = continuationCtx;
+ }
+
+ public void run(Continuation c)
+ {
+ /* submit the read request */
+ continuationCtx_.setContinuation(c);
+ ByteBuffer buffer = ByteBuffer.wrap( buffer_ );
+ fileChannel_.read(buffer, diskPos_, continuationCtx_, new ReadCompletionHandler());
+ }
+ }
+
+ /**
+ * Read completion handler for AIO framework. The context
+ * that is passed in, is a Continuation that needs to be
+ * resumed on read completion.
+ *
+ * @author alakshman
+ *
+ * @param <V> number of bytes read.
+ */
+ class ReadCompletionHandler implements CompletionHandler<Integer, ContinuationContext>
+ {
+ public void cancelled(ContinuationContext attachment)
+ {
+ }
+
+ public void completed(Integer result, ContinuationContext attachment)
+ {
+ logger_.debug("Bytes read " + result);
+ if ( attachment != null )
+ {
+ Continuation c = attachment.getContinuation();
+ attachment.result(result);
+ if ( c != null )
+ {
+ c = Continuation.continueWith(c, attachment);
+ ContinuationsExecutor.doPostProcessing(c);
+ }
+ }
+ }
+
+ public void failed(Throwable th, ContinuationContext attachment)
+ {
+ }
+ }
+
+ /*
+ * This implementation is based on the buffer implementation in Modula-3's
+ * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
+ */
+ private boolean dirty_; // true iff unflushed bytes exist
+ private boolean closed_; // true iff the file is closed
+ private long curr_; // current position in file
+ private long lo_, hi_; // bounds on characters in "buff"
+ private byte[] buffer_ = new byte[0]; // local buffer
+ private long maxHi_; // this.lo + this.buff.length
+ private boolean hitEOF_; // buffer contains last file block?
+ private long diskPos_; // disk position
+ private AsynchronousFileChannel fileChannel_; // asynchronous file channel used for AIO.
+ private boolean bContinuations_; // indicates if used in continuations mode.
+
+ /*
+ * To describe the above fields, we introduce the following abstractions for
+ * the file "f":
+ *
+ * len(f) the length of the file curr(f) the current position in the file
+ * c(f) the abstract contents of the file disk(f) the contents of f's
+ * backing disk file closed(f) true iff the file is closed
+ *
+ * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
+ * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
+ * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
+ * operation has the effect of making "disk(f)" identical to "c(f)".
+ *
+ * A file is said to be *valid* if the following conditions hold:
+ *
+ * V1. The "closed" and "curr" fields are correct:
+ *
+ * f.closed == closed(f) f.curr == curr(f)
+ *
+ * V2. The current position is either contained in the buffer, or just past
+ * the buffer:
+ *
+ * f.lo <= f.curr <= f.hi
+ *
+ * V3. Any (possibly) unflushed characters are stored in "f.buff":
+ *
+ * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
+ *
+ * V4. For all characters not covered by V3, c(f) and disk(f) agree:
+ *
+ * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
+ * disk(f)[i])
+ *
+ * V5. "f.dirty" is true iff the buffer contains bytes that should be
+ * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
+ *
+ * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
+ *
+ * V6. this.maxHi == this.lo + this.buff.length
+ *
+ * Note that "f.buff" can be "null" in a valid file, since the range of
+ * characters in V3 is empty when "f.lo == f.curr".
+ *
+ * A file is said to be *ready* if the buffer contains the current position,
+ * i.e., when:
+ *
+ * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
+ *
+ * When a file is ready, reading or writing a single byte can be performed
+ * by reading or writing the in-memory buffer without performing a disk
+ * operation.
+ */
+
+ /**
+ * Open a AIORandomAccessFile for r/w operations.
+ * @param file file to be opened.
+ */
+ public AIORandomAccessFile(File file) throws IOException
+ {
+ super(file, "rw");
+ this.init(file, 0, false);
+ }
+
+ /**
+ * Open a AIORandomAccessFile for r/w operations.
+ * @param file file to be opened.
+ * @param bContinuations specify if continuations
+ * support is required.
+ */
+ public AIORandomAccessFile(File file, boolean bContinuations) throws IOException
+ {
+ super(file, "rw");
+ this.init(file, 0, bContinuations);
+ }
+
+ /**
+ * Open a AIORandomAccessFile for r/w operations.
+ * @param file file to be opened
+ * @param size amount of data to be buffer as part
+ * of r/w operations
+ * @throws IOException
+ */
+ public AIORandomAccessFile(File file, int size) throws IOException
+ {
+ super(file, "rw");
+ init(file, size, false);
+ }
+
+ /**
+ * Open a AIORandomAccessFile for r/w operations.
+ * @param file file to be opened
+ * @param size amount of data to be buffer as part
+ * of r/w operations
+ * @param bContinuations specify if continuations
+ * support is required.
+ * @throws IOException
+ */
+ public AIORandomAccessFile(File file, int size, boolean bContinuations) throws IOException
+ {
+ super(file, "rw");
+ init(file, size, bContinuations);
+ }
+
+ /**
+ * Open a AIORandomAccessFile for r/w operations.
+ * @param name of file to be opened.
+ */
+ public AIORandomAccessFile(String name) throws IOException
+ {
+ super(name, "rw");
+ this.init(new File(name), 0, false);
+ }
+
+ /**
+ * Open a AIORandomAccessFile for r/w operations.
+ * @param name of file to be opened.
+ * @param bContinuations specify if continuations
+ * support is required.
+ */
+ public AIORandomAccessFile(String name, boolean bContinuations) throws IOException
+ {
+ super(name, "rw");
+ this.init(new File(name), 0, bContinuations);
+ }
+
+ /**
+ * Open a AIORandomAccessFile for r/w operations.
+ * @param name of file to be opened.
+ * @param size buffering size to be used.
+ */
+ public AIORandomAccessFile(String name, int size) throws IOException
+ {
+ super(name, "rw");
+ this.init(new File(name), size, false);
+ }
+
+ /**
+ * Open a AIORandomAccessFile for r/w operations.
+ * @param name of file to be opened.
+ * @param name of file to be opened.
+ * @param bContinuations specify if continuations
+ * support is required.
+ */
+ public AIORandomAccessFile(String name, int size, boolean bContinuations) throws IOException
+ {
+ super(name, "rw");
+ this.init(new File(name), size, bContinuations);
+ }
+
+ private void init(File file, int size, boolean bVal) throws IOException
+ {
+ bContinuations_ = bVal;
+ OpenOption[] openOptions = new OpenOption[]{StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ};
+ this.dirty_ = this.closed_ = false;
+ this.lo_ = this.curr_ = this.hi_ = 0;
+ this.buffer_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
+ this.maxHi_ = (long) BuffSz_;
+ this.hitEOF_ = false;
+ this.diskPos_ = 0L;
+ /* set up the asynchronous file channel */
+ if ( diskIOPool_ == null )
+ {
+ createLock_.lock();
+ try
+ {
+ if ( diskIOPool_ == null )
+ {
+ int maxThreads = DatabaseDescriptor.getThreadsPerPool();
+ diskIOPool_ = new ContinuationsExecutor( maxThreads,
+ maxThreads,
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryImpl("DISK-IO-POOL")
+ );
+ }
+ }
+ finally
+ {
+ createLock_.unlock();
+ }
+ }
+ Set<OpenOption> set = new HashSet<OpenOption>( Arrays.asList(openOptions) );
+ fileChannel_ = AsynchronousFileChannel.open(file.toPath(), set, diskIOPool_);
+ }
+
+ public void close() throws IOException
+ {
+ this.onClose();
+ this.closed_ = true;
+ fileChannel_.close();
+ }
+
+ /**
+ * Flush any bytes in the file's buffer that have not yet been written to
+ * disk. If the file was created read-only, this method is a no-op.
+ */
+ public void flush() throws IOException
+ {
+ this.flushBuffer();
+ }
+
+ /**
+ * Flush any dirty bytes in the buffer to disk.
+ */
+ private void flushBuffer() throws IOException
+ {
+ if (this.dirty_)
+ {
+ int len = (int) (this.curr_ - this.lo_);
+ doWrite(this.lo_, false);
+ this.diskPos_ = this.curr_;
+ this.dirty_ = false;
+ }
+ }
+
+ /**
+ * Invoked when close() is invoked and causes the flush
+ * of the last few bytes to block when the write is submitted.
+ * @throws IOException
+ */
+ private void onClose() throws IOException
+ {
+ if (this.dirty_)
+ {
+ int len = (int) (this.curr_ - this.lo_);
+ doWrite(this.lo_, true);
+ this.diskPos_ = this.curr_;
+ this.dirty_ = false;
+ }
+ }
+
+ /**
+ * This method submits an I/O for write where the write happens at
+ * <i>position</i> within the file.
+ *
+ * @param position to seek to within the file
+ * @param onClose indicates if this method was invoked on a close().
+ */
+ private void doWrite(long position, boolean onClose)
+ {
+ ByteBuffer buffer = ByteBuffer.wrap(buffer_);
+ int length = (int) (this.curr_ - this.lo_);
+ buffer.limit(length);
+ Future<Integer> futurePtr = fileChannel_.write(buffer, position, null, new WriteCompletionHandler<Integer>());
+ if ( onClose )
+ {
+ try
+ {
+ /* this will block but will execute only on a close() */
+ futurePtr.get();
+ }
+ catch (ExecutionException ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ catch (InterruptedException ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+ buffer_ = new byte[buffer_.length];
+ }
+
+ /**
+ * Read at most "this.buff.length" bytes into "this.buff", returning the
+ * number of bytes read. If the return result is less than
+ * "this.buff.length", then EOF was read.
+ */
+ private int fillBuffer() throws IOException
+ {
+ int cnt = 0;
+ ByteBuffer buffer = ByteBuffer.allocate(buffer_.length);
+ Future<Integer> futurePtr = fileChannel_.read(buffer, this.diskPos_, null, new ReadCompletionHandler());
+
+ try
+ {
+ /*
+ * This should block
+ */
+ cnt = futurePtr.get();
+ }
+ catch (ExecutionException ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ catch (InterruptedException ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+
+ if ( (cnt < 0) && ( this.hitEOF_ = (cnt < this.buffer_.length) ) )
+ {
+ // make sure buffer that wasn't read is initialized with -1
+ if ( cnt < 0 )
+ cnt = 0;
+ Arrays.fill(buffer_, cnt, this.buffer_.length, (byte) 0xff);
+ }
+ else
+ {
+ buffer_ = buffer.array();
+ }
+ this.diskPos_ += cnt;
+ return cnt;
+ }
+
+ /**
+ * Read as much data as indicated by the size of the buffer.
+ * This method is only invoked in continuation mode.
+ */
+ private int fillBuffer2()
+ {
+ ContinuationContext continuationCtx = (ContinuationContext)Continuation.getContext();
+ IContinuable reader = new AIOReader( continuationCtx );
+ ContinuationsExecutor.putInTls(reader);
+ /* suspend the continuation */
+ Continuation.suspend();
+
+ continuationCtx = (ContinuationContext)Continuation.getContext();
+ int cnt = (Integer)continuationCtx.result();
+
+ if ( (cnt < 0) && ( this.hitEOF_ = (cnt < this.buffer_.length) ) )
+ {
+ // make sure buffer that wasn't read is initialized with -1
+ if ( cnt < 0 )
+ cnt = 0;
+ Arrays.fill(buffer_, cnt, this.buffer_.length, (byte) 0xff);
+ }
+ this.diskPos_ += cnt;
+ return cnt;
+ }
+
+ /**
+ * This method positions <code>this.curr</code> at position
+ * <code>pos</code>. If <code>pos</code> does not fall in the current
+ * buffer, it flushes the current buffer and loads the correct one.
+ * <p>
+ *
+ * On exit from this routine <code>this.curr == this.hi</code> iff
+ * <code>pos</code> is at or past the end-of-file, which can only happen
+ * if the file was opened in read-only mode.
+ */
+ public void seek(long pos) throws IOException
+ {
+ if (pos >= this.hi_ || pos < this.lo_)
+ {
+ // seeking outside of current buffer -- flush and read
+ this.flushBuffer();
+ this.lo_ = pos & BuffMask_; // start at BuffSz boundary
+ this.maxHi_ = this.lo_ + (long) this.buffer_.length;
+ if (this.diskPos_ != this.lo_)
+ {
+ this.diskPos_ = this.lo_;
+ }
+
+ int n = 0;
+ /* Perform the read operations in continuation style */
+ if ( bContinuations_ )
+ {
+ n = fillBuffer2();
+ }
+ else
+ {
+ n = fillBuffer();
+ }
+ this.hi_ = this.lo_ + (long) n;
+ }
+ else
+ {
+ // seeking inside current buffer -- no read required
+ if (pos < this.curr_)
+ {
+ // if seeking backwards, we must flush to maintain V4
+ this.flushBuffer();
+ }
+ }
+ this.curr_ = pos;
+ }
+
+
+ public long getFilePointer()
+ {
+ return this.curr_;
+ }
+
+ public long length() throws IOException
+ {
+ return Math.max(this.curr_, super.length());
+ }
+
+ public int read() throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ // test for EOF
+ // if (this.hi < this.maxHi) return -1;
+ if (this.hitEOF_)
+ return -1;
+
+ // slow path -- read another buffer
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ return -1;
+ }
+ byte res = this.buffer_[(int) (this.curr_ - this.lo_)];
+ this.curr_++;
+ return ((int) res) & 0xFF; // convert byte -> int
+ }
+
+ public int read(byte[] b) throws IOException
+ {
+ return this.read(b, 0, b.length);
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ // test for EOF
+ // if (this.hi < this.maxHi) return -1;
+ if (this.hitEOF_)
+ return -1;
+
+ // slow path -- read another buffer
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ return -1;
+ }
+ len = Math.min(len, (int) (this.hi_ - this.curr_));
+ int buffOff = (int) (this.curr_ - this.lo_);
+ System.arraycopy(this.buffer_, buffOff, b, off, len);
+ this.curr_ += len;
+ return len;
+ }
+
+ public void write(int b) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ if (this.hitEOF_ && this.hi_ < this.maxHi_)
+ {
+ // at EOF -- bump "hi"
+ this.hi_++;
+ }
+ else
+ {
+ // slow path -- write current buffer; read next one
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ {
+ // appending to EOF -- bump "hi"
+ this.hi_++;
+ }
+ }
+ }
+ this.buffer_[(int) (this.curr_ - this.lo_)] = (byte) b;
+ this.curr_++;
+ this.dirty_ = true;
+ }
+
+ public void write(byte[] b) throws IOException
+ {
+ this.write(b, 0, b.length);
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ while (len > 0)
+ {
+ int n = this.writeAtMost(b, off, len);
+ off += n;
+ len -= n;
+ }
+ this.dirty_ = true;
+ }
+
+ /*
+ * Write at most "len" bytes to "b" starting at position "off", and return
+ * the number of bytes written.
+ */
+ private int writeAtMost(byte[] b, int off, int len) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ if (this.hitEOF_ && this.hi_ < this.maxHi_)
+ {
+ // at EOF -- bump "hi"
+ this.hi_ = this.maxHi_;
+ }
+ else
+ {
+ // slow path -- write current buffer; read next one
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ {
+ // appending to EOF -- bump "hi"
+ this.hi_ = this.maxHi_;
+ }
+ }
+ }
+ len = Math.min(len, (int) (this.hi_ - this.curr_));
+ int buffOff = (int) (this.curr_ - this.lo_);
+ System.arraycopy(b, off, this.buffer_, buffOff, len);
+ this.curr_ += len;
+ return len;
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ /*
+ int i = 0;
+ try
+ {
+ RandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat"), 64*1024);
+ aRaf2.seek(0L);
+ while ( i < 10000 )
+ {
+ aRaf2.writeInt(32);
+ aRaf2.writeUTF("Avinash Lakshman");
+ ++i;
+ }
+ aRaf2.close();
+ }
+ catch( IOException ex )
+ {
+ ex.printStackTrace();
+ }
+ */
+ /*
+ int j = 0;
+ try
+ {
+ RandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat") );
+ while ( j < 10 )
+ {
+ System.out.println( aRaf2.readInt() );
+ System.out.println( aRaf2.readUTF() );
+ ++j;
+ }
+ aRaf2.close();
+ }
+ catch( IOException ex )
+ {
+ ex.printStackTrace();
+ }
+ */
+
+ ExecutorService es = new ContinuationsExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() );
+ es.execute(new ReadImpl());
+ }
+}
+
+class ReadImpl implements Runnable
+{
+ public void run()
+ {
+ int i = 0;
+ try
+ {
+ System.out.println("About to start the whole thing ...");
+ AIORandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat"), true );
+ System.out.println("About to seek ...");
+
+ //aRaf2.seek(0L);
+ System.out.println( aRaf2.readInt() );
+ System.out.println( aRaf2.readUTF() );
+
+ System.out.println("About to seek a second time ...");
+ aRaf2.seek(66000L);
+ System.out.println( aRaf2.readInt() );
+ System.out.println( aRaf2.readUTF() );
+
+ aRaf2.close();
+ }
+ catch( IOException ex )
+ {
+ ex.printStackTrace();
+ }
+ }
+}
+
+class WriteImpl implements Runnable
+{
+ public void run()
+ {
+ int i = 0;
+ try
+ {
+ AIORandomAccessFile aRaf2 = new AIORandomAccessFile( new File("/var/cassandra/test.dat"));
+ while ( i < 10000 )
+ {
+ aRaf2.writeInt(32);
+ aRaf2.writeUTF("Avinash Lakshman thinks John McCain is an idiot");
+ ++i;
+ }
+ aRaf2.close();
+ }
+ catch( IOException ex )
+ {
+ ex.printStackTrace();
+ }
+ }
+}
+
+/**
+ * Write completion handler for AIO framework. The context
+ * that is passed in, is a Continuation that needs to be
+ * resumed on write completion. For now the continuation is
+ * not used at all.
+ *
+ * @author alakshman
+ *
+ * @param <V> number of bytes written.
+ */
+class WriteCompletionHandler<V> implements CompletionHandler<V, Continuation>
+{
+ private final static Logger logger_ = Logger.getLogger(WriteCompletionHandler.class);
+
+ public void cancelled(Continuation attachment)
+ {
+ }
+
+ public void completed(V result, Continuation attachment)
+ {
+ logger_.debug("Bytes written " + result);
+ while ( attachment != null )
+ {
+ attachment = Continuation.continueWith(attachment);
+ }
+ }
+
+ public void failed(Throwable th, Continuation attachment)
+ {
+ }
+}
+
+
Added: incubator/cassandra/src/org/apache/cassandra/io/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/BufferedRandomAccessFile.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/BufferedRandomAccessFile.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/BufferedRandomAccessFile.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.util.Arrays;
+
+import org.apache.log4j.Logger;
+
+/**
+ * A <code>BufferedRandomAccessFile</code> is like a
+ * <code>RandomAccessFile</code>, but it uses a private buffer so that most
+ * operations do not require a disk access.
+ * <P>
+ *
+ * Note: The operations on this class are unmonitored. Also, the correct
+ * functioning of the <code>RandomAccessFile</code> methods that are not
+ * overridden here relies on the implementation of those methods in the
+ * superclass.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class BufferedRandomAccessFile extends RandomAccessFile
+{
+ private static final Logger logger_ = Logger.getLogger(BufferedRandomAccessFile.class);
+ static final int LogBuffSz_ = 16; // 64K buffer
+ public static final int BuffSz_ = (1 << LogBuffSz_);
+ static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
+
+ /*
+ * This implementation is based on the buffer implementation in Modula-3's
+ * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
+ */
+ private boolean dirty_; // true iff unflushed bytes exist
+ private boolean closed_; // true iff the file is closed
+ private long curr_; // current position in file
+ private long lo_, hi_; // bounds on characters in "buff"
+ private byte[] buff_; // local buffer
+ private long maxHi_; // this.lo + this.buff.length
+ private boolean hitEOF_; // buffer contains last file block?
+ private long diskPos_; // disk position
+
+ /*
+ * To describe the above fields, we introduce the following abstractions for
+ * the file "f":
+ *
+ * len(f) the length of the file curr(f) the current position in the file
+ * c(f) the abstract contents of the file disk(f) the contents of f's
+ * backing disk file closed(f) true iff the file is closed
+ *
+ * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
+ * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
+ * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
+ * operation has the effect of making "disk(f)" identical to "c(f)".
+ *
+ * A file is said to be *valid* if the following conditions hold:
+ *
+ * V1. The "closed" and "curr" fields are correct:
+ *
+ * f.closed == closed(f) f.curr == curr(f)
+ *
+ * V2. The current position is either contained in the buffer, or just past
+ * the buffer:
+ *
+ * f.lo <= f.curr <= f.hi
+ *
+ * V3. Any (possibly) unflushed characters are stored in "f.buff":
+ *
+ * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
+ *
+ * V4. For all characters not covered by V3, c(f) and disk(f) agree:
+ *
+ * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
+ * disk(f)[i])
+ *
+ * V5. "f.dirty" is true iff the buffer contains bytes that should be
+ * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
+ *
+ * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
+ *
+ * V6. this.maxHi == this.lo + this.buff.length
+ *
+ * Note that "f.buff" can be "null" in a valid file, since the range of
+ * characters in V3 is empty when "f.lo == f.curr".
+ *
+ * A file is said to be *ready* if the buffer contains the current position,
+ * i.e., when:
+ *
+ * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
+ *
+ * When a file is ready, reading or writing a single byte can be performed
+ * by reading or writing the in-memory buffer without performing a disk
+ * operation.
+ */
+
+ /**
+ * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code>
+ * in mode <code>mode</code>, which should be "r" for reading only, or
+ * "rw" for reading and writing.
+ */
+ public BufferedRandomAccessFile(File file, String mode) throws IOException
+ {
+ super(file, mode);
+ this.init(0);
+ }
+
+ public BufferedRandomAccessFile(File file, String mode, int size) throws IOException
+ {
+ super(file, mode);
+ this.init(size);
+ }
+
+ /**
+ * Open a new <code>BufferedRandomAccessFile</code> on the file named
+ * <code>name</code> in mode <code>mode</code>, which should be "r" for
+ * reading only, or "rw" for reading and writing.
+ */
+ public BufferedRandomAccessFile(String name, String mode) throws IOException
+ {
+ super(name, mode);
+ this.init(0);
+ }
+
+ public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException
+ {
+ super(name, mode);
+ this.init(size);
+ }
+
+ private void init(int size)
+ {
+ this.dirty_ = this.closed_ = false;
+ this.lo_ = this.curr_ = this.hi_ = 0;
+ this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
+ this.maxHi_ = (long) BuffSz_;
+ this.hitEOF_ = false;
+ this.diskPos_ = 0L;
+ }
+
+ public void close() throws IOException
+ {
+ this.flush();
+ this.closed_ = true;
+ super.close();
+ }
+
+ /**
+ * Flush any bytes in the file's buffer that have not yet been written to
+ * disk. If the file was created read-only, this method is a no-op.
+ */
+ public void flush() throws IOException
+ {
+ this.flushBuffer();
+ }
+
+ /* Flush any dirty bytes in the buffer to disk. */
+ private void flushBuffer() throws IOException
+ {
+ if (this.dirty_)
+ {
+ if (this.diskPos_ != this.lo_)
+ super.seek(this.lo_);
+ int len = (int) (this.curr_ - this.lo_);
+ super.write(this.buff_, 0, len);
+ this.diskPos_ = this.curr_;
+ this.dirty_ = false;
+ }
+ }
+
+ /*
+ * Read at most "this.buff.length" bytes into "this.buff", returning the
+ * number of bytes read. If the return result is less than
+ * "this.buff.length", then EOF was read.
+ */
+ private int fillBuffer() throws IOException
+ {
+ int cnt = 0;
+ int rem = this.buff_.length;
+ while (rem > 0)
+ {
+ int n = super.read(this.buff_, cnt, rem);
+ if (n < 0)
+ break;
+ cnt += n;
+ rem -= n;
+ }
+ if ( (cnt < 0) && (this.hitEOF_ = (cnt < this.buff_.length)) )
+ {
+ // make sure buffer that wasn't read is initialized with -1
+ Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);
+ }
+ this.diskPos_ += cnt;
+ return cnt;
+ }
+
+ /*
+ * This method positions <code>this.curr</code> at position <code>pos</code>.
+ * If <code>pos</code> does not fall in the current buffer, it flushes the
+ * current buffer and loads the correct one.<p>
+ *
+ * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code>
+ * is at or past the end-of-file, which can only happen if the file was
+ * opened in read-only mode.
+ */
+ public void seek(long pos) throws IOException
+ {
+ if (pos >= this.hi_ || pos < this.lo_)
+ {
+ // seeking outside of current buffer -- flush and read
+ this.flushBuffer();
+ this.lo_ = pos & BuffMask_; // start at BuffSz boundary
+ this.maxHi_ = this.lo_ + (long) this.buff_.length;
+ if (this.diskPos_ != this.lo_)
+ {
+ super.seek(this.lo_);
+ this.diskPos_ = this.lo_;
+ }
+ int n = this.fillBuffer();
+ this.hi_ = this.lo_ + (long) n;
+ }
+ else
+ {
+ // seeking inside current buffer -- no read required
+ if (pos < this.curr_)
+ {
+ // if seeking backwards, we must flush to maintain V4
+ this.flushBuffer();
+ }
+ }
+ this.curr_ = pos;
+ }
+
+ public long getFilePointer()
+ {
+ return this.curr_;
+ }
+
+ public long length() throws IOException
+ {
+ return Math.max(this.curr_, super.length());
+ }
+
+ public int read() throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ // test for EOF
+ // if (this.hi < this.maxHi) return -1;
+ if (this.hitEOF_)
+ return -1;
+
+ // slow path -- read another buffer
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ return -1;
+ }
+ byte res = this.buff_[(int) (this.curr_ - this.lo_)];
+ this.curr_++;
+ return ((int) res) & 0xFF; // convert byte -> int
+ }
+
+ public int read(byte[] b) throws IOException
+ {
+ return this.read(b, 0, b.length);
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ // test for EOF
+ // if (this.hi < this.maxHi) return -1;
+ if (this.hitEOF_)
+ return -1;
+
+ // slow path -- read another buffer
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ return -1;
+ }
+ len = Math.min(len, (int) (this.hi_ - this.curr_));
+ int buffOff = (int) (this.curr_ - this.lo_);
+ System.arraycopy(this.buff_, buffOff, b, off, len);
+ this.curr_ += len;
+ return len;
+ }
+
+ public void write(int b) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ if (this.hitEOF_ && this.hi_ < this.maxHi_)
+ {
+ // at EOF -- bump "hi"
+ this.hi_++;
+ }
+ else
+ {
+ // slow path -- write current buffer; read next one
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ {
+ // appending to EOF -- bump "hi"
+ this.hi_++;
+ }
+ }
+ }
+ this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
+ this.curr_++;
+ this.dirty_ = true;
+ }
+
+ public void write(byte[] b) throws IOException
+ {
+ this.write(b, 0, b.length);
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ while (len > 0)
+ {
+ int n = this.writeAtMost(b, off, len);
+ off += n;
+ len -= n;
+ }
+ this.dirty_ = true;
+ }
+
+ /*
+ * Write at most "len" bytes to "b" starting at position "off", and return
+ * the number of bytes written.
+ */
+ private int writeAtMost(byte[] b, int off, int len) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ if (this.hitEOF_ && this.hi_ < this.maxHi_)
+ {
+ // at EOF -- bump "hi"
+ this.hi_ = this.maxHi_;
+ }
+ else
+ {
+ // slow path -- write current buffer; read next one
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ {
+ // appending to EOF -- bump "hi"
+ this.hi_ = this.maxHi_;
+ }
+ }
+ }
+ len = Math.min(len, (int) (this.hi_ - this.curr_));
+ int buffOff = (int) (this.curr_ - this.lo_);
+ System.arraycopy(b, off, this.buff_, buffOff, len);
+ this.curr_ += len;
+ return len;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/io/ChecksumManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/ChecksumManager.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/ChecksumManager.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/ChecksumManager.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Method;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.zip.Adler32;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import bak.pcj.map.AbstractLongKeyLongMap;
+import bak.pcj.map.LongKeyLongChainedHashMap;
+
+/**
+ * This class manages the persistence of checksums and keeps
+ * them in memory. It maintains a mapping of data files on
+ * disk to their corresponding checksum files. It is also
+ * loads the checksums in memory on start up.
+ *
+ * @author alakshman
+ *
+ */
+class ChecksumManager
+{
+ private static Logger logger_ = Logger.getLogger(ChecksumManager.class);
+ /* Keeps a mapping of checksum manager instances to data file */
+ private static Map<String, ChecksumManager> chksumMgrs_ = new HashMap<String, ChecksumManager>();
+ private static Lock lock_ = new ReentrantLock();
+ private static final String checksumPrefix_ = "Checksum-";
+ private static final int bufferSize_ = 8*1024*1024;
+ private static final long chunkMask_ = 0x00000000FFFFFFFFL;
+ private static final long fileIdMask_ = 0x7FFFFFFF00000000L;
+ /* Map where checksums are cached. */
+ private static AbstractLongKeyLongMap chksums_ = new LongKeyLongChainedHashMap();
+
+ public static ChecksumManager instance(String dataFile) throws IOException
+ {
+ ChecksumManager chksumMgr = chksumMgrs_.get(dataFile);
+ if ( chksumMgr == null )
+ {
+ lock_.lock();
+ try
+ {
+ if ( chksumMgr == null )
+ {
+ chksumMgr = new ChecksumManager(dataFile);
+ chksumMgrs_.put(dataFile, chksumMgr);
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return chksumMgr;
+ }
+
+ /* TODO: Debug only */
+ public static ChecksumManager instance(String dataFile, String chkSumFile) throws IOException
+ {
+ ChecksumManager chksumMgr = chksumMgrs_.get(dataFile);
+ if ( chksumMgr == null )
+ {
+ lock_.lock();
+ try
+ {
+ if ( chksumMgr == null )
+ {
+ chksumMgr = new ChecksumManager(dataFile, chkSumFile);
+ chksumMgrs_.put(dataFile, chksumMgr);
+ }
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return chksumMgr;
+ }
+
+
+ /**
+ * On start read all the check sum files on disk and
+ * pull them into memory.
+ * @throws IOException
+ */
+ public static void onStart() throws IOException
+ {
+ String[] directories = DatabaseDescriptor.getAllDataFileLocations();
+ List<File> allFiles = new ArrayList<File>();
+ for ( String directory : directories )
+ {
+ File file = new File(directory);
+ File[] files = file.listFiles();
+ for ( File f : files )
+ {
+ if ( f.getName().contains(ChecksumManager.checksumPrefix_) )
+ {
+ allFiles.add(f);
+ }
+ }
+ }
+
+ for ( File file : allFiles )
+ {
+ int fId = SequenceFile.getFileId(file.getName());
+ ChecksumReader chksumRdr = new ChecksumReader(file.getAbsolutePath(), 0L, file.length());
+
+ int chunk = 0;
+ while ( !chksumRdr.isEOF() )
+ {
+ long value = chksumRdr.readLong();
+ long key = ChecksumManager.key(fId, ++chunk);
+ chksums_.put(key, value);
+ }
+ }
+ }
+
+ /**
+ * On delete of this dataFile remove the checksums associated with
+ * this file from memory, remove the check sum manager instance.
+ *
+ * @param dataFile data file that is being deleted.
+ * @throws IOException
+ */
+ public static void onFileDelete(String dataFile) throws IOException
+ {
+ File f = new File(dataFile);
+ long size = f.length();
+ int fileId = SequenceFile.getFileId(f.getName());
+ int chunks = (int)(size >> 16L);
+
+ for ( int i = 0; i < chunks; ++i )
+ {
+ long key = ChecksumManager.key(fileId, i);
+ chksums_.remove(key);
+ }
+
+ /* remove the check sum manager instance */
+ chksumMgrs_.remove(dataFile);
+ String chksumFile = f.getParent() + System.getProperty("file.separator") + checksumPrefix_ + fileId + ".db";
+ FileUtils.delete(chksumFile);
+ }
+
+ private static long key(int fileId, int chunkId)
+ {
+ long key = 0;
+ key |= fileId;
+ key <<= 32;
+ key |= chunkId;
+ return key;
+ }
+
+ private RandomAccessFile raf_;
+ private Adler32 adler_ = new Adler32();
+
+ ChecksumManager(String dataFile) throws IOException
+ {
+ File file = new File(dataFile);
+ String directory = file.getParent();
+ String f = file.getName();
+ short fId = SequenceFile.getFileId(f);
+ String chkSumFile = directory + System.getProperty("file.separator") + checksumPrefix_ + fId + ".db";
+ raf_ = new RandomAccessFile(chkSumFile, "rw");
+ }
+
+ /* TODO: Remove later. */
+ ChecksumManager(String dataFile, String chkSumFile) throws IOException
+ {
+ File file = new File(dataFile);
+ String directory = file.getParent();
+ String f = file.getName();
+ short fId = SequenceFile.getFileId(f);
+ raf_ = new RandomAccessFile(chkSumFile, "rw");
+
+ file = new File(chkSumFile);
+ ChecksumReader chksumRdr = new ChecksumReader(file.getAbsolutePath(), 0L, file.length());
+
+ int chunk = 0;
+ while ( !chksumRdr.isEOF() )
+ {
+ long value = chksumRdr.readLong();
+ long key = ChecksumManager.key(fId, ++chunk);
+ chksums_.put(key, value);
+ }
+ }
+
+ /**
+ * Log the checksum for the the specified file and chunk
+ * within the file.
+ * @param fileId id associated with the file
+ * @param chunkId chunk within the file.
+ * @param buffer for which the checksum needs to be calculated.
+ * @throws IOException
+ */
+ void logChecksum(int fileId, int chunkId, byte[] buffer)
+ {
+ logChecksum(fileId, chunkId, buffer, 0, buffer.length);
+ }
+
+ /**
+ * Log the checksum for the the specified file and chunk
+ * within the file.
+ * @param fileId id associated with the file
+ * @param chunkId chunk within the file.
+ * @param buffer for which the checksum needs to be calculated.
+ * @param startoffset offset to start within the buffer
+ * @param length size of the checksum buffer.
+ * @throws IOException
+ */
+ void logChecksum(int fileId, int chunkId, byte[] buffer, int startOffset, int length)
+ {
+ try
+ {
+ adler_.update(buffer, startOffset, length);
+ long chksum = adler_.getValue();
+ adler_.reset();
+ /* log checksums to disk */
+ raf_.writeLong(chksum);
+ /* add the chksum to memory */
+ long key = ChecksumManager.key(fileId, chunkId);
+ chksums_.put(key, chksum);
+ }
+ catch ( IOException ex )
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+ }
+
+ /**
+ * Validate checksums for the data in the buffer.
+ * @file name of the file from which data is being
+ * read.
+ * @chunkId chunkId
+ * @param buffer with data for which checksum needs to be
+ * verified.
+ * @throws IOException
+ */
+ void validateChecksum(String file, int chunkId, byte[] buffer) throws IOException
+ {
+ validateChecksum(file, chunkId, buffer, 0, buffer.length);
+ }
+
+ /**
+ * Validate checksums for the data in the buffer for the region
+ * that is encapsulated in the section object
+ * @file name of the file from which data is being
+ * read.
+ * @chunkId chunkId
+ * @param buffer with data for which checksum needs to be
+ * verified.
+ * @param startOffset within the buffer
+ * @param length of the data whose checksum needs to be verified.
+ * @throws IOException
+ */
+ void validateChecksum(String file, int chunkId, byte[] buffer, int startOffset, int length) throws IOException
+ {
+ int fId = SequenceFile.getFileId(file);
+ long key = ChecksumManager.key(fId, chunkId);
+ adler_.update(buffer, startOffset, length);
+ long currentChksum = adler_.getValue();
+ adler_.reset();
+ long oldChksum = chksums_.get(key);
+ if ( currentChksum != oldChksum )
+ {
+ throw new IOException("Checksums do not match in file " + file + " for chunk " + chunkId + ".");
+ }
+ }
+
+
+ /**
+ * Get the checksum for the specified file's chunk
+ * @param fileId id associated with the file.
+ * @param chunkId chunk within the file.
+ * @return associated checksum for the chunk
+ */
+ long getChecksum(int fileId, int chunkId)
+ {
+ long key = ChecksumManager.key(fileId, chunkId);
+ return chksums_.get(key);
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ ChecksumReader rdr = new ChecksumReader("C:\\Engagements\\Cassandra\\Checksum-1.db");
+ while ( !rdr.isEOF() )
+ {
+ System.out.println(rdr.readLong());
+ }
+ rdr.close();
+ }
+}
+
+/**
+ * ChecksumReader is used to memory map the checksum files and
+ * load the data into memory.
+ *
+ * @author alakshman
+ *
+ */
+class ChecksumReader
+{
+ private static Logger logger_ = Logger.getLogger(ChecksumReader.class);
+ private String filename_;
+ private MappedByteBuffer buffer_;
+
+ ChecksumReader(String filename) throws IOException
+ {
+ filename_ = filename;
+ File f = new File(filename);
+ map(0, f.length());
+ }
+
+ ChecksumReader(String filename, long start, long end) throws IOException
+ {
+ filename_ = filename;
+ map(start, end);
+ }
+
+ public void map() throws IOException
+ {
+ RandomAccessFile file = new RandomAccessFile(filename_, "rw");
+ try
+ {
+ buffer_ = file.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, file.length() );
+ buffer_.load();
+ }
+ finally
+ {
+ file.close();
+ }
+ }
+
+ public void map(long start, long end) throws IOException
+ {
+ if ( start < 0 || end < 0 || end < start )
+ throw new IllegalArgumentException("Invalid values for start and end.");
+
+ RandomAccessFile file = new RandomAccessFile(filename_, "rw");
+ try
+ {
+ if ( end == 0 )
+ end = file.length();
+ buffer_ = file.getChannel().map(FileChannel.MapMode.READ_ONLY, start, end);
+ buffer_.load();
+ }
+ finally
+ {
+ file.close();
+ }
+ }
+
+ void unmap(final Object buffer)
+ {
+ AccessController.doPrivileged( new PrivilegedAction<MappedByteBuffer>()
+ {
+ public MappedByteBuffer run()
+ {
+ try
+ {
+ Method getCleanerMethod = buffer.getClass().getMethod("cleaner", new Class[0]);
+ getCleanerMethod.setAccessible(true);
+ sun.misc.Cleaner cleaner = (sun.misc.Cleaner)getCleanerMethod.invoke(buffer,new Object[0]);
+ cleaner.clean();
+ }
+ catch(Throwable e)
+ {
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
+ return null;
+ }
+ });
+ }
+
+ public long readLong() throws IOException
+ {
+ return buffer_.getLong();
+ }
+
+ public boolean isEOF()
+ {
+ return ( buffer_.remaining() == 0 );
+ }
+
+
+ public void close() throws IOException
+ {
+ unmap(buffer_);
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,456 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+
+/**
+ * A <code>ChecksumRandomAccessFile</code> is like a
+ * <code>RandomAccessFile</code>, but it uses a private buffer so that most
+ * operations do not require a disk access.
+ * <P>
+ *
+ * Note: The operations on this class are unmonitored. Also, the correct
+ * functioning of the <code>RandomAccessFile</code> methods that are not
+ * overridden here relies on the implementation of those methods in the
+ * superclass.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class ChecksumRandomAccessFile extends RandomAccessFile
+{
+ private static enum ChecksumOperations
+ {
+ LOG,
+ VERIFY
+ }
+
+ static final int LogBuffSz_ = 16; // 64K buffer
+ private static final int checksumSz_ = (1 << LogBuffSz_); // 64K
+ public static final int BuffSz_ = (1 << LogBuffSz_);
+ static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
+
+ /*
+ * This implementation is based on the buffer implementation in Modula-3's
+ * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
+ */
+ private boolean dirty_; // true iff unflushed bytes exist
+ private boolean closed_; // true iff the file is closed
+ private long curr_; // current position in file
+ private long lo_, hi_; // bounds on characters in "buff"
+ private byte[] buff_; // local buffer
+ private long maxHi_; // this.lo + this.buff.length
+ private boolean hitEOF_; // buffer contains last file block?
+ private long diskPos_; // disk position
+ private String filename_; // file name
+
+ /*
+ * To describe the above fields, we introduce the following abstractions for
+ * the file "f":
+ *
+ * len(f) the length of the file curr(f) the current position in the file
+ * c(f) the abstract contents of the file disk(f) the contents of f's
+ * backing disk file closed(f) true iff the file is closed
+ *
+ * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
+ * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
+ * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
+ * operation has the effect of making "disk(f)" identical to "c(f)".
+ *
+ * A file is said to be *valid* if the following conditions hold:
+ *
+ * V1. The "closed" and "curr" fields are correct:
+ *
+ * f.closed == closed(f) f.curr == curr(f)
+ *
+ * V2. The current position is either contained in the buffer, or just past
+ * the buffer:
+ *
+ * f.lo <= f.curr <= f.hi
+ *
+ * V3. Any (possibly) unflushed characters are stored in "f.buff":
+ *
+ * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
+ *
+ * V4. For all characters not covered by V3, c(f) and disk(f) agree:
+ *
+ * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
+ * disk(f)[i])
+ *
+ * V5. "f.dirty" is true iff the buffer contains bytes that should be
+ * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
+ *
+ * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
+ *
+ * V6. this.maxHi == this.lo + this.buff.length
+ *
+ * Note that "f.buff" can be "null" in a valid file, since the range of
+ * characters in V3 is empty when "f.lo == f.curr".
+ *
+ * A file is said to be *ready* if the buffer contains the current position,
+ * i.e., when:
+ *
+ * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
+ *
+ * When a file is ready, reading or writing a single byte can be performed
+ * by reading or writing the in-memory buffer without performing a disk
+ * operation.
+ */
+
+ /**
+ * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code>
+ * in mode <code>mode</code>, which should be "r" for reading only, or
+ * "rw" for reading and writing.
+ */
+ public ChecksumRandomAccessFile(File file, String mode) throws IOException
+ {
+ super(file, mode);
+ this.init(file.getAbsolutePath(), 0);
+ }
+
+ public ChecksumRandomAccessFile(File file, String mode, int size) throws IOException
+ {
+ super(file, mode);
+ this.init(file.getAbsolutePath(), size);
+ }
+
+ /**
+ * Open a new <code>BufferedRandomAccessFile</code> on the file named
+ * <code>name</code> in mode <code>mode</code>, which should be "r" for
+ * reading only, or "rw" for reading and writing.
+ */
+ public ChecksumRandomAccessFile(String name, String mode) throws IOException
+ {
+ super(name, mode);
+ this.init(name, 0);
+ }
+
+ public ChecksumRandomAccessFile(String name, String mode, int size) throws FileNotFoundException
+ {
+ super(name, mode);
+ this.init(name, size);
+ }
+
+ private void init(String name, int size)
+ {
+ this.dirty_ = this.closed_ = false;
+ this.lo_ = this.curr_ = this.hi_ = 0;
+ this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
+ this.maxHi_ = (long) BuffSz_;
+ this.hitEOF_ = false;
+ this.diskPos_ = 0L;
+ this.filename_ = name;
+ }
+
+ public void close() throws IOException
+ {
+ this.flush();
+ this.closed_ = true;
+ super.close();
+ }
+
+ /**
+ * Flush any bytes in the file's buffer that have not yet been written to
+ * disk. If the file was created read-only, this method is a no-op.
+ */
+ public void flush() throws IOException
+ {
+ this.flushBuffer();
+ }
+
+ private void doChecksumOperation(ChecksumOperations chksumOps) throws IOException
+ {
+ int buffSz = buff_.length;
+ /*
+ * If the diskPos_ is at the buffer boundary then return
+ * diskPos_ - 1 else return the actual diskPos_.
+ */
+ long currentPosition = ( (diskPos_ % buffSz) == 0 ) ? diskPos_ - 1 : diskPos_;
+ /* Tells me which buffered chunk I am in. */
+ long chunk = (currentPosition / buffSz) + 1;
+ /* Number of checksum chunks within a buffer */
+ int chksumChunks = buff_.length / checksumSz_;
+ /* Position of the start of the previous buffer boundary */
+ long pos = (chunk == 0) ? 0 : (chunk - 1)*buffSz;
+ int startOffset = 0;
+ int chksumChunkId = (int)(chksumChunks*(chunk - 1) + 1);
+ do
+ {
+ int fId = SequenceFile.getFileId(filename_);
+ switch( chksumOps )
+ {
+ case LOG:
+ ChecksumManager.instance(filename_).logChecksum(fId, chksumChunkId++, buff_, startOffset, checksumSz_);
+ break;
+ case VERIFY:
+ ChecksumManager.instance(filename_).validateChecksum(filename_, chksumChunkId++, buff_, startOffset, checksumSz_);
+ break;
+ }
+ pos += checksumSz_;
+ startOffset += checksumSz_;
+ }
+ while ( pos < currentPosition );
+ }
+
+ /**
+ * Flush any dirty bytes in the buffer to disk.
+ */
+ private void flushBuffer() throws IOException
+ {
+ if (this.dirty_)
+ {
+ if (this.diskPos_ != this.lo_)
+ super.seek(this.lo_);
+ int len = (int) (this.curr_ - this.lo_);
+ super.write(this.buff_, 0, len);
+ this.diskPos_ = this.curr_;
+ this.dirty_ = false;
+ /* checksum the data before we write to disk */
+ doChecksumOperation(ChecksumOperations.LOG);
+ }
+ }
+
+ /**
+ * Read at most "this.buff.length" bytes into "this.buff", returning the
+ * number of bytes read. If the return result is less than
+ * "this.buff.length", then EOF was read.
+ */
+ private int fillBuffer() throws IOException
+ {
+ int cnt = 0;
+ int rem = this.buff_.length;
+ int n = -1;
+ while (rem > 0)
+ {
+ n = super.read(this.buff_, cnt, rem);
+ if (n < 0)
+ break;
+ cnt += n;
+ rem -= n;
+ }
+ if (this.hitEOF_ = (cnt < this.buff_.length))
+ {
+ // make sure buffer that wasn't read is initialized with -1
+ Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);
+ }
+ this.diskPos_ += cnt;
+ return cnt;
+ }
+
+ /**
+ * This method positions <code>this.curr</code> at position <code>pos</code>.
+ * If <code>pos</code> does not fall in the current buffer, it flushes the
+ * current buffer and loads the correct one.<p>
+ *
+ * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code>
+ * is at or past the end-of-file, which can only happen if the file was
+ * opened in read-only mode.
+ */
+ public void seek(long pos) throws IOException
+ {
+ if (pos >= this.hi_ || pos < this.lo_)
+ {
+ // seeking outside of current buffer -- flush and read
+ this.flushBuffer();
+ this.lo_ = pos & BuffMask_; // start at BuffSz boundary
+ this.maxHi_ = this.lo_ + (long) this.buff_.length;
+ if (this.diskPos_ != this.lo_)
+ {
+ super.seek(this.lo_);
+ this.diskPos_ = this.lo_;
+ }
+ int n = this.fillBuffer();
+ if ( n > 0 )
+ {
+ doChecksumOperation(ChecksumOperations.VERIFY);
+ }
+ this.hi_ = this.lo_ + (long) n;
+ }
+ else
+ {
+ // seeking inside current buffer -- no read required
+ if (pos < this.curr_)
+ {
+ // if seeking backwards, we must flush to maintain V4
+ this.flushBuffer();
+ }
+ }
+ this.curr_ = pos;
+ }
+
+ public long getFilePointer()
+ {
+ return this.curr_;
+ }
+
+ public long length() throws IOException
+ {
+ return Math.max(this.curr_, super.length());
+ }
+
+ public int read() throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ // test for EOF
+ // if (this.hi < this.maxHi) return -1;
+ if (this.hitEOF_)
+ return -1;
+
+ // slow path -- read another buffer
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ return -1;
+ }
+ byte res = this.buff_[(int) (this.curr_ - this.lo_)];
+ this.curr_++;
+ return ((int) res) & 0xFF; // convert byte -> int
+ }
+
+ public int read(byte[] b) throws IOException
+ {
+ return this.read(b, 0, b.length);
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ // test for EOF
+ // if (this.hi < this.maxHi) return -1;
+ if (this.hitEOF_)
+ return -1;
+
+ // slow path -- read another buffer
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ return -1;
+ }
+ len = Math.min(len, (int) (this.hi_ - this.curr_));
+ int buffOff = (int) (this.curr_ - this.lo_);
+ System.arraycopy(this.buff_, buffOff, b, off, len);
+ this.curr_ += len;
+ return len;
+ }
+
+ public void write(int b) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ if (this.hitEOF_ && this.hi_ < this.maxHi_)
+ {
+ // at EOF -- bump "hi"
+ this.hi_++;
+ }
+ else
+ {
+ // slow path -- write current buffer; read next one
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ {
+ // appending to EOF -- bump "hi"
+ this.hi_++;
+ }
+ }
+ }
+ this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
+ this.curr_++;
+ this.dirty_ = true;
+ }
+
+ public void write(byte[] b) throws IOException
+ {
+ this.write(b, 0, b.length);
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ while (len > 0)
+ {
+ int n = this.writeAtMost(b, off, len);
+ off += n;
+ len -= n;
+ }
+ this.dirty_ = true;
+ }
+
+ /*
+ * Write at most "len" bytes to "b" starting at position "off", and return
+ * the number of bytes written.
+ */
+ private int writeAtMost(byte[] b, int off, int len) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ if (this.hitEOF_ && this.hi_ < this.maxHi_)
+ {
+ // at EOF -- bump "hi"
+ this.hi_ = this.maxHi_;
+ }
+ else
+ {
+ // slow path -- write current buffer; read next one
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ {
+ // appending to EOF -- bump "hi"
+ this.hi_ = this.maxHi_;
+ }
+ }
+ }
+ len = Math.min(len, (int) (this.hi_ - this.curr_));
+ int buffOff = (int) (this.curr_ - this.lo_);
+ System.arraycopy(b, off, this.buff_, buffOff, len);
+ this.curr_ += len;
+ return len;
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+
+ RandomAccessFile raf = new ChecksumRandomAccessFile("C:\\Engagements\\Table-ColumnFamily-1-Data.dat", "rw");
+ byte[] bytes = new byte[32*1024];
+ Random random = new Random();
+
+ for ( int i = 0; i < 16; ++i )
+ {
+ random.nextBytes(bytes);
+ raf.write(bytes);
+ }
+ raf.close();
+
+ String file = "C:\\Engagements\\Checksum-1.db";
+ ChecksumManager.instance("C:\\Engagements\\Table-ColumnFamily-1-Data.dat", file);
+
+ raf = new ChecksumRandomAccessFile("C:\\Engagements\\Table-ColumnFamily-1-Data.dat", "rw", 4*1024*1024);
+ bytes = new byte[32*1024];
+
+ for ( int i = 0; i < 16; ++i )
+ {
+ raf.readFully(bytes);
+ }
+ raf.close();
+
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/io/Coordinate.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/Coordinate.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/Coordinate.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/Coordinate.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io;
+
+/**
+ * Section of a file that needs to be scanned
+ * is represented by this class.
+*/
+class Coordinate
+{
+ long start_;
+ long end_;
+
+ Coordinate(long start, long end)
+ {
+ start_ = start;
+ end_ = end;
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/io/DataInputBuffer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/DataInputBuffer.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/DataInputBuffer.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/DataInputBuffer.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.cassandra.continuations.Suspendable;
+
+
+/**
+ * An implementation of the DataInputStream interface. This instance is completely thread
+ * unsafe.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class DataInputBuffer extends DataInputStream
+{
+ /*
+ * This is a clone of the ByteArrayInputStream class w/o any
+ * method being synchronized.
+ */
+ public static class FastByteArrayInputStream extends InputStream
+ {
+ /**
+ * An array of bytes that was provided by the creator of the stream.
+ * Elements <code>buf[0]</code> through <code>buf[count-1]</code> are
+ * the only bytes that can ever be read from the stream; element
+ * <code>buf[pos]</code> is the next byte to be read.
+ */
+ protected byte buf[];
+
+ /**
+ * The index of the next character to read from the input stream buffer.
+ * This value should always be nonnegative and not larger than the value of
+ * <code>count</code>. The next byte to be read from the input stream
+ * buffer will be <code>buf[pos]</code>.
+ */
+ protected int pos;
+
+ /**
+ * The currently marked position in the stream. ByteArrayInputStream objects
+ * are marked at position zero by default when constructed. They may be
+ * marked at another position within the buffer by the <code>mark()</code>
+ * method. The current buffer position is set to this point by the
+ * <code>reset()</code> method.
+ * <p>
+ * If no mark has been set, then the value of mark is the offset passed to
+ * the constructor (or 0 if the offset was not supplied).
+ *
+ * @since JDK1.1
+ */
+ protected int mark = 0;
+
+ /**
+ * The index one greater than the last valid character in the input stream
+ * buffer. This value should always be nonnegative and not larger than the
+ * length of <code>buf</code>. It is one greater than the position of the
+ * last byte within <code>buf</code> that can ever be read from the input
+ * stream buffer.
+ */
+ protected int count;
+
+ public FastByteArrayInputStream()
+ {
+ buf = new byte[0];
+ }
+
+ /**
+ * Creates a <code>ByteArrayInputStream</code> so that it uses
+ * <code>buf</code> as its buffer array. The buffer array is not copied.
+ * The initial value of <code>pos</code> is <code>0</code> and the
+ * initial value of <code>count</code> is the length of <code>buf</code>.
+ *
+ * @param buf
+ * the input buffer.
+ */
+ public FastByteArrayInputStream(byte buf[])
+ {
+ this.buf = buf;
+ this.pos = 0;
+ this.count = buf.length;
+ }
+
+ /**
+ * Creates <code>ByteArrayInputStream</code> that uses <code>buf</code>
+ * as its buffer array. The initial value of <code>pos</code> is
+ * <code>offset</code> and the initial value of <code>count</code> is
+ * the minimum of <code>offset+length</code> and <code>buf.length</code>.
+ * The buffer array is not copied. The buffer's mark is set to the specified
+ * offset.
+ *
+ * @param buf
+ * the input buffer.
+ * @param offset
+ * the offset in the buffer of the first byte to read.
+ * @param length
+ * the maximum number of bytes to read from the buffer.
+ */
+ public FastByteArrayInputStream(byte buf[], int offset, int length)
+ {
+ this.buf = buf;
+ this.pos = offset;
+ this.count = Math.min(offset + length, buf.length);
+ this.mark = offset;
+ }
+
+ public final void setBytes(byte[] bytes)
+ {
+ buf = bytes;
+ pos = 0;
+ count = bytes.length;
+ }
+
+ /**
+ * Reads the next byte of data from this input stream. The value byte is
+ * returned as an <code>int</code> in the range <code>0</code> to
+ * <code>255</code>. If no byte is available because the end of the
+ * stream has been reached, the value <code>-1</code> is returned.
+ * <p>
+ * This <code>read</code> method cannot block.
+ *
+ * @return the next byte of data, or <code>-1</code> if the end of the
+ * stream has been reached.
+ */
+ public final int read()
+ {
+ return (pos < count) ? ( buf[pos++] & 0xFF ) : -1;
+ }
+
+ /**
+ * Reads up to <code>len</code> bytes of data into an array of bytes from
+ * this input stream. If <code>pos</code> equals <code>count</code>,
+ * then <code>-1</code> is returned to indicate end of file. Otherwise,
+ * the number <code>k</code> of bytes read is equal to the smaller of
+ * <code>len</code> and <code>count-pos</code>. If <code>k</code> is
+ * positive, then bytes <code>buf[pos]</code> through
+ * <code>buf[pos+k-1]</code> are copied into <code>b[off]</code> through
+ * <code>b[off+k-1]</code> in the manner performed by
+ * <code>System.arraycopy</code>. The value <code>k</code> is added
+ * into <code>pos</code> and <code>k</code> is returned.
+ * <p>
+ * This <code>read</code> method cannot block.
+ *
+ * @param b
+ * the buffer into which the data is read.
+ * @param off
+ * the start offset in the destination array <code>b</code>
+ * @param len
+ * the maximum number of bytes read.
+ * @return the total number of bytes read into the buffer, or
+ * <code>-1</code> if there is no more data because the end of the
+ * stream has been reached.
+ * @exception NullPointerException
+ * If <code>b</code> is <code>null</code>.
+ * @exception IndexOutOfBoundsException
+ * If <code>off</code> is negative, <code>len</code> is
+ * negative, or <code>len</code> is greater than
+ * <code>b.length - off</code>
+ */
+ public final int read(byte b[], int off, int len)
+ {
+ if (b == null)
+ {
+ throw new NullPointerException();
+ }
+ else if (off < 0 || len < 0 || len > b.length - off)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ if (pos >= count)
+ {
+ return -1;
+ }
+ if (pos + len > count)
+ {
+ len = count - pos;
+ }
+ if (len <= 0)
+ {
+ return 0;
+ }
+ System.arraycopy(buf, pos, b, off, len);
+ pos += len;
+ return len;
+ }
+
+ /**
+ * Skips <code>n</code> bytes of input from this input stream. Fewer bytes
+ * might be skipped if the end of the input stream is reached. The actual
+ * number <code>k</code> of bytes to be skipped is equal to the smaller of
+ * <code>n</code> and <code>count-pos</code>. The value <code>k</code>
+ * is added into <code>pos</code> and <code>k</code> is returned.
+ *
+ * @param n
+ * the number of bytes to be skipped.
+ * @return the actual number of bytes skipped.
+ */
+ public final long skip(long n)
+ {
+ if (pos + n > count)
+ {
+ n = count - pos;
+ }
+ if (n < 0)
+ {
+ return 0;
+ }
+ pos += n;
+ return n;
+ }
+
+ /**
+ * Returns the number of remaining bytes that can be read (or skipped over)
+ * from this input stream.
+ * <p>
+ * The value returned is <code>count - pos</code>, which is the
+ * number of bytes remaining to be read from the input buffer.
+ *
+ * @return the number of remaining bytes that can be read (or skipped over)
+ * from this input stream without blocking.
+ */
+ public final int available()
+ {
+ return count - pos;
+ }
+
+ /**
+ * Tests if this <code>InputStream</code> supports mark/reset. The
+ * <code>markSupported</code> method of <code>ByteArrayInputStream</code>
+ * always returns <code>true</code>.
+ *
+ * @since JDK1.1
+ */
+ public final boolean markSupported()
+ {
+ return true;
+ }
+
+ /**
+ * Set the current marked position in the stream. ByteArrayInputStream
+ * objects are marked at position zero by default when constructed. They may
+ * be marked at another position within the buffer by this method.
+ * <p>
+ * If no mark has been set, then the value of the mark is the offset passed
+ * to the constructor (or 0 if the offset was not supplied).
+ *
+ * <p>
+ * Note: The <code>readAheadLimit</code> for this class has no meaning.
+ *
+ * @since JDK1.1
+ */
+ public final void mark(int readAheadLimit)
+ {
+ mark = pos;
+ }
+
+ /**
+ * Resets the buffer to the marked position. The marked position is 0 unless
+ * another position was marked or an offset was specified in the
+ * constructor.
+ */
+ public final void reset()
+ {
+ pos = mark;
+ }
+
+ /**
+ * Closing a <tt>ByteArrayInputStream</tt> has no effect. The methods in
+ * this class can be called after the stream has been closed without
+ * generating an <tt>IOException</tt>.
+ * <p>
+ */
+ public final void close() throws IOException
+ {
+ }
+ }
+
+ private static class Buffer extends FastByteArrayInputStream
+ {
+ public Buffer()
+ {
+ super(new byte[] {});
+ }
+
+ public void reset(byte[] input, int start, int length)
+ {
+ this.buf = input;
+ this.count = start + length;
+ this.mark = start;
+ this.pos = start;
+ }
+
+ public int getPosition()
+ {
+ return pos;
+ }
+
+ public void setPosition(int position)
+ {
+ pos = position;
+ }
+
+ public int getLength()
+ {
+ return count;
+ }
+ }
+
+ private Buffer buffer_;
+
+ /** Constructs a new empty buffer. */
+ public DataInputBuffer()
+ {
+ this(new Buffer());
+ }
+
+ private DataInputBuffer(Buffer buffer)
+ {
+ super(buffer);
+ this.buffer_ = buffer;
+ }
+
+ /** Resets the data that the buffer reads. */
+ public void reset(byte[] input, int length)
+ {
+ buffer_.reset(input, 0, length);
+ }
+
+ /** Resets the data that the buffer reads. */
+ public void reset(byte[] input, int start, int length)
+ {
+ buffer_.reset(input, start, length);
+ }
+
+ /** Returns the current position in the input. */
+ public int getPosition()
+ {
+ return buffer_.getPosition();
+ }
+
+ /** Set the position within the input */
+ public void setPosition(int position)
+ {
+ buffer_.setPosition(position);
+ }
+
+ /** Returns the length of the input. */
+ public int getLength()
+ {
+ return buffer_.getLength();
+ }
+}