You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC
svn commit: r749218 [19/34] - in /incubator/cassandra: branches/ dist/
nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/
trunk/src/org/apache/ trunk/src/org/apache/cassandra/
trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...
Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/ChecksumRandomAccessFile.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,456 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+
+/**
+ * A <code>ChecksumRandomAccessFile</code> is like a
+ * <code>RandomAccessFile</code>, but it uses a private buffer so that most
+ * operations do not require a disk access.
+ * <P>
+ *
+ * Note: The operations on this class are unmonitored. Also, the correct
+ * functioning of the <code>RandomAccessFile</code> methods that are not
+ * overridden here relies on the implementation of those methods in the
+ * superclass.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class ChecksumRandomAccessFile extends RandomAccessFile
+{
+ private static enum ChecksumOperations
+ {
+ LOG,
+ VERIFY
+ }
+
+ static final int LogBuffSz_ = 16; // 64K buffer
+ private static final int checksumSz_ = (1 << LogBuffSz_); // 64K
+ public static final int BuffSz_ = (1 << LogBuffSz_);
+ static final long BuffMask_ = ~(((long) BuffSz_) - 1L);
+
+ /*
+ * This implementation is based on the buffer implementation in Modula-3's
+ * "Rd", "Wr", "RdClass", and "WrClass" interfaces.
+ */
+ private boolean dirty_; // true iff unflushed bytes exist
+ private boolean closed_; // true iff the file is closed
+ private long curr_; // current position in file
+ private long lo_, hi_; // bounds on characters in "buff"
+ private byte[] buff_; // local buffer
+ private long maxHi_; // this.lo + this.buff.length
+ private boolean hitEOF_; // buffer contains last file block?
+ private long diskPos_; // disk position
+ private String filename_; // file name
+
+ /*
+ * To describe the above fields, we introduce the following abstractions for
+ * the file "f":
+ *
+ * len(f) the length of the file curr(f) the current position in the file
+ * c(f) the abstract contents of the file disk(f) the contents of f's
+ * backing disk file closed(f) true iff the file is closed
+ *
+ * "curr(f)" is an index in the closed interval [0, len(f)]. "c(f)" is a
+ * character sequence of length "len(f)". "c(f)" and "disk(f)" may differ if
+ * "c(f)" contains unflushed writes not reflected in "disk(f)". The flush
+ * operation has the effect of making "disk(f)" identical to "c(f)".
+ *
+ * A file is said to be *valid* if the following conditions hold:
+ *
+ * V1. The "closed" and "curr" fields are correct:
+ *
+ * f.closed == closed(f) f.curr == curr(f)
+ *
+ * V2. The current position is either contained in the buffer, or just past
+ * the buffer:
+ *
+ * f.lo <= f.curr <= f.hi
+ *
+ * V3. Any (possibly) unflushed characters are stored in "f.buff":
+ *
+ * (forall i in [f.lo, f.curr): c(f)[i] == f.buff[i - f.lo])
+ *
+ * V4. For all characters not covered by V3, c(f) and disk(f) agree:
+ *
+ * (forall i in [f.lo, len(f)): i not in [f.lo, f.curr) => c(f)[i] ==
+ * disk(f)[i])
+ *
+ * V5. "f.dirty" is true iff the buffer contains bytes that should be
+ * flushed to the file; by V3 and V4, only part of the buffer can be dirty.
+ *
+ * f.dirty == (exists i in [f.lo, f.curr): c(f)[i] != f.buff[i - f.lo])
+ *
+ * V6. this.maxHi == this.lo + this.buff.length
+ *
+ * Note that "f.buff" can be "null" in a valid file, since the range of
+ * characters in V3 is empty when "f.lo == f.curr".
+ *
+ * A file is said to be *ready* if the buffer contains the current position,
+ * i.e., when:
+ *
+ * R1. !f.closed && f.buff != null && f.lo <= f.curr && f.curr < f.hi
+ *
+ * When a file is ready, reading or writing a single byte can be performed
+ * by reading or writing the in-memory buffer without performing a disk
+ * operation.
+ */
+
+ /**
+ * Open a new <code>BufferedRandomAccessFile</code> on <code>file</code>
+ * in mode <code>mode</code>, which should be "r" for reading only, or
+ * "rw" for reading and writing.
+ */
+ public ChecksumRandomAccessFile(File file, String mode) throws IOException
+ {
+ super(file, mode);
+ this.init(file.getAbsolutePath(), 0);
+ }
+
+ public ChecksumRandomAccessFile(File file, String mode, int size) throws IOException
+ {
+ super(file, mode);
+ this.init(file.getAbsolutePath(), size);
+ }
+
+ /**
+ * Open a new <code>BufferedRandomAccessFile</code> on the file named
+ * <code>name</code> in mode <code>mode</code>, which should be "r" for
+ * reading only, or "rw" for reading and writing.
+ */
+ public ChecksumRandomAccessFile(String name, String mode) throws IOException
+ {
+ super(name, mode);
+ this.init(name, 0);
+ }
+
+ public ChecksumRandomAccessFile(String name, String mode, int size) throws FileNotFoundException
+ {
+ super(name, mode);
+ this.init(name, size);
+ }
+
+ private void init(String name, int size)
+ {
+ this.dirty_ = this.closed_ = false;
+ this.lo_ = this.curr_ = this.hi_ = 0;
+ this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];
+ this.maxHi_ = (long) BuffSz_;
+ this.hitEOF_ = false;
+ this.diskPos_ = 0L;
+ this.filename_ = name;
+ }
+
+ public void close() throws IOException
+ {
+ this.flush();
+ this.closed_ = true;
+ super.close();
+ }
+
+ /**
+ * Flush any bytes in the file's buffer that have not yet been written to
+ * disk. If the file was created read-only, this method is a no-op.
+ */
+ public void flush() throws IOException
+ {
+ this.flushBuffer();
+ }
+
+ private void doChecksumOperation(ChecksumOperations chksumOps) throws IOException
+ {
+ int buffSz = buff_.length;
+ /*
+ * If the diskPos_ is at the buffer boundary then return
+ * diskPos_ - 1 else return the actual diskPos_.
+ */
+ long currentPosition = ( (diskPos_ % buffSz) == 0 ) ? diskPos_ - 1 : diskPos_;
+ /* Tells me which buffered chunk I am in. */
+ long chunk = (currentPosition / buffSz) + 1;
+ /* Number of checksum chunks within a buffer */
+ int chksumChunks = buff_.length / checksumSz_;
+ /* Position of the start of the previous buffer boundary */
+ long pos = (chunk == 0) ? 0 : (chunk - 1)*buffSz;
+ int startOffset = 0;
+ int chksumChunkId = (int)(chksumChunks*(chunk - 1) + 1);
+ do
+ {
+ int fId = SequenceFile.getFileId(filename_);
+ switch( chksumOps )
+ {
+ case LOG:
+ ChecksumManager.instance(filename_).logChecksum(fId, chksumChunkId++, buff_, startOffset, checksumSz_);
+ break;
+ case VERIFY:
+ ChecksumManager.instance(filename_).validateChecksum(filename_, chksumChunkId++, buff_, startOffset, checksumSz_);
+ break;
+ }
+ pos += checksumSz_;
+ startOffset += checksumSz_;
+ }
+ while ( pos < currentPosition );
+ }
+
+ /**
+ * Flush any dirty bytes in the buffer to disk.
+ */
+ private void flushBuffer() throws IOException
+ {
+ if (this.dirty_)
+ {
+ if (this.diskPos_ != this.lo_)
+ super.seek(this.lo_);
+ int len = (int) (this.curr_ - this.lo_);
+ super.write(this.buff_, 0, len);
+ this.diskPos_ = this.curr_;
+ this.dirty_ = false;
+ /* checksum the data before we write to disk */
+ doChecksumOperation(ChecksumOperations.LOG);
+ }
+ }
+
+ /**
+ * Read at most "this.buff.length" bytes into "this.buff", returning the
+ * number of bytes read. If the return result is less than
+ * "this.buff.length", then EOF was read.
+ */
+ private int fillBuffer() throws IOException
+ {
+ int cnt = 0;
+ int rem = this.buff_.length;
+ int n = -1;
+ while (rem > 0)
+ {
+ n = super.read(this.buff_, cnt, rem);
+ if (n < 0)
+ break;
+ cnt += n;
+ rem -= n;
+ }
+ if (this.hitEOF_ = (cnt < this.buff_.length))
+ {
+ // make sure buffer that wasn't read is initialized with -1
+ Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);
+ }
+ this.diskPos_ += cnt;
+ return cnt;
+ }
+
+ /**
+ * This method positions <code>this.curr</code> at position <code>pos</code>.
+ * If <code>pos</code> does not fall in the current buffer, it flushes the
+ * current buffer and loads the correct one.<p>
+ *
+ * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code>
+ * is at or past the end-of-file, which can only happen if the file was
+ * opened in read-only mode.
+ */
+ public void seek(long pos) throws IOException
+ {
+ if (pos >= this.hi_ || pos < this.lo_)
+ {
+ // seeking outside of current buffer -- flush and read
+ this.flushBuffer();
+ this.lo_ = pos & BuffMask_; // start at BuffSz boundary
+ this.maxHi_ = this.lo_ + (long) this.buff_.length;
+ if (this.diskPos_ != this.lo_)
+ {
+ super.seek(this.lo_);
+ this.diskPos_ = this.lo_;
+ }
+ int n = this.fillBuffer();
+ if ( n > 0 )
+ {
+ doChecksumOperation(ChecksumOperations.VERIFY);
+ }
+ this.hi_ = this.lo_ + (long) n;
+ }
+ else
+ {
+ // seeking inside current buffer -- no read required
+ if (pos < this.curr_)
+ {
+ // if seeking backwards, we must flush to maintain V4
+ this.flushBuffer();
+ }
+ }
+ this.curr_ = pos;
+ }
+
+ public long getFilePointer()
+ {
+ return this.curr_;
+ }
+
+ public long length() throws IOException
+ {
+ return Math.max(this.curr_, super.length());
+ }
+
+ public int read() throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ // test for EOF
+ // if (this.hi < this.maxHi) return -1;
+ if (this.hitEOF_)
+ return -1;
+
+ // slow path -- read another buffer
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ return -1;
+ }
+ byte res = this.buff_[(int) (this.curr_ - this.lo_)];
+ this.curr_++;
+ return ((int) res) & 0xFF; // convert byte -> int
+ }
+
+ public int read(byte[] b) throws IOException
+ {
+ return this.read(b, 0, b.length);
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ // test for EOF
+ // if (this.hi < this.maxHi) return -1;
+ if (this.hitEOF_)
+ return -1;
+
+ // slow path -- read another buffer
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ return -1;
+ }
+ len = Math.min(len, (int) (this.hi_ - this.curr_));
+ int buffOff = (int) (this.curr_ - this.lo_);
+ System.arraycopy(this.buff_, buffOff, b, off, len);
+ this.curr_ += len;
+ return len;
+ }
+
+ public void write(int b) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ if (this.hitEOF_ && this.hi_ < this.maxHi_)
+ {
+ // at EOF -- bump "hi"
+ this.hi_++;
+ }
+ else
+ {
+ // slow path -- write current buffer; read next one
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ {
+ // appending to EOF -- bump "hi"
+ this.hi_++;
+ }
+ }
+ }
+ this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;
+ this.curr_++;
+ this.dirty_ = true;
+ }
+
+ public void write(byte[] b) throws IOException
+ {
+ this.write(b, 0, b.length);
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ while (len > 0)
+ {
+ int n = this.writeAtMost(b, off, len);
+ off += n;
+ len -= n;
+ }
+ this.dirty_ = true;
+ }
+
+ /*
+ * Write at most "len" bytes to "b" starting at position "off", and return
+ * the number of bytes written.
+ */
+ private int writeAtMost(byte[] b, int off, int len) throws IOException
+ {
+ if (this.curr_ >= this.hi_)
+ {
+ if (this.hitEOF_ && this.hi_ < this.maxHi_)
+ {
+ // at EOF -- bump "hi"
+ this.hi_ = this.maxHi_;
+ }
+ else
+ {
+ // slow path -- write current buffer; read next one
+ this.seek(this.curr_);
+ if (this.curr_ == this.hi_)
+ {
+ // appending to EOF -- bump "hi"
+ this.hi_ = this.maxHi_;
+ }
+ }
+ }
+ len = Math.min(len, (int) (this.hi_ - this.curr_));
+ int buffOff = (int) (this.curr_ - this.lo_);
+ System.arraycopy(b, off, this.buff_, buffOff, len);
+ this.curr_ += len;
+ return len;
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+
+ RandomAccessFile raf = new ChecksumRandomAccessFile("C:\\Engagements\\Table-ColumnFamily-1-Data.dat", "rw");
+ byte[] bytes = new byte[32*1024];
+ Random random = new Random();
+
+ for ( int i = 0; i < 16; ++i )
+ {
+ random.nextBytes(bytes);
+ raf.write(bytes);
+ }
+ raf.close();
+
+ String file = "C:\\Engagements\\Checksum-1.db";
+ ChecksumManager.instance("C:\\Engagements\\Table-ColumnFamily-1-Data.dat", file);
+
+ raf = new ChecksumRandomAccessFile("C:\\Engagements\\Table-ColumnFamily-1-Data.dat", "rw", 4*1024*1024);
+ bytes = new byte[32*1024];
+
+ for ( int i = 0; i < 16; ++i )
+ {
+ raf.readFully(bytes);
+ }
+ raf.close();
+
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io;
+
+/**
+ * Section of a file that needs to be scanned
+ * is represented by this class.
+*/
+class Coordinate
+{
+ long start_;
+ long end_;
+
+ Coordinate(long start, long end)
+ {
+ start_ = start;
+ end_ = end;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/DataInputBuffer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/DataInputBuffer.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/DataInputBuffer.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/DataInputBuffer.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.cassandra.continuations.Suspendable;
+
+
+/**
+ * An implementation of the DataInputStream interface. This instance is completely thread
+ * unsafe.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class DataInputBuffer extends DataInputStream
+{
+ /*
+ * This is a clone of the ByteArrayInputStream class w/o any
+ * method being synchronized.
+ */
+ public static class FastByteArrayInputStream extends InputStream
+ {
+ /**
+ * An array of bytes that was provided by the creator of the stream.
+ * Elements <code>buf[0]</code> through <code>buf[count-1]</code> are
+ * the only bytes that can ever be read from the stream; element
+ * <code>buf[pos]</code> is the next byte to be read.
+ */
+ protected byte buf[];
+
+ /**
+ * The index of the next character to read from the input stream buffer.
+ * This value should always be nonnegative and not larger than the value of
+ * <code>count</code>. The next byte to be read from the input stream
+ * buffer will be <code>buf[pos]</code>.
+ */
+ protected int pos;
+
+ /**
+ * The currently marked position in the stream. ByteArrayInputStream objects
+ * are marked at position zero by default when constructed. They may be
+ * marked at another position within the buffer by the <code>mark()</code>
+ * method. The current buffer position is set to this point by the
+ * <code>reset()</code> method.
+ * <p>
+ * If no mark has been set, then the value of mark is the offset passed to
+ * the constructor (or 0 if the offset was not supplied).
+ *
+ * @since JDK1.1
+ */
+ protected int mark = 0;
+
+ /**
+ * The index one greater than the last valid character in the input stream
+ * buffer. This value should always be nonnegative and not larger than the
+ * length of <code>buf</code>. It is one greater than the position of the
+ * last byte within <code>buf</code> that can ever be read from the input
+ * stream buffer.
+ */
+ protected int count;
+
+ public FastByteArrayInputStream()
+ {
+ buf = new byte[0];
+ }
+
+ /**
+ * Creates a <code>ByteArrayInputStream</code> so that it uses
+ * <code>buf</code> as its buffer array. The buffer array is not copied.
+ * The initial value of <code>pos</code> is <code>0</code> and the
+ * initial value of <code>count</code> is the length of <code>buf</code>.
+ *
+ * @param buf
+ * the input buffer.
+ */
+ public FastByteArrayInputStream(byte buf[])
+ {
+ this.buf = buf;
+ this.pos = 0;
+ this.count = buf.length;
+ }
+
+ /**
+ * Creates <code>ByteArrayInputStream</code> that uses <code>buf</code>
+ * as its buffer array. The initial value of <code>pos</code> is
+ * <code>offset</code> and the initial value of <code>count</code> is
+ * the minimum of <code>offset+length</code> and <code>buf.length</code>.
+ * The buffer array is not copied. The buffer's mark is set to the specified
+ * offset.
+ *
+ * @param buf
+ * the input buffer.
+ * @param offset
+ * the offset in the buffer of the first byte to read.
+ * @param length
+ * the maximum number of bytes to read from the buffer.
+ */
+ public FastByteArrayInputStream(byte buf[], int offset, int length)
+ {
+ this.buf = buf;
+ this.pos = offset;
+ this.count = Math.min(offset + length, buf.length);
+ this.mark = offset;
+ }
+
+ public final void setBytes(byte[] bytes)
+ {
+ buf = bytes;
+ pos = 0;
+ count = bytes.length;
+ }
+
+ /**
+ * Reads the next byte of data from this input stream. The value byte is
+ * returned as an <code>int</code> in the range <code>0</code> to
+ * <code>255</code>. If no byte is available because the end of the
+ * stream has been reached, the value <code>-1</code> is returned.
+ * <p>
+ * This <code>read</code> method cannot block.
+ *
+ * @return the next byte of data, or <code>-1</code> if the end of the
+ * stream has been reached.
+ */
+ public final int read()
+ {
+ return (pos < count) ? ( buf[pos++] & 0xFF ) : -1;
+ }
+
+ /**
+ * Reads up to <code>len</code> bytes of data into an array of bytes from
+ * this input stream. If <code>pos</code> equals <code>count</code>,
+ * then <code>-1</code> is returned to indicate end of file. Otherwise,
+ * the number <code>k</code> of bytes read is equal to the smaller of
+ * <code>len</code> and <code>count-pos</code>. If <code>k</code> is
+ * positive, then bytes <code>buf[pos]</code> through
+ * <code>buf[pos+k-1]</code> are copied into <code>b[off]</code> through
+ * <code>b[off+k-1]</code> in the manner performed by
+ * <code>System.arraycopy</code>. The value <code>k</code> is added
+ * into <code>pos</code> and <code>k</code> is returned.
+ * <p>
+ * This <code>read</code> method cannot block.
+ *
+ * @param b
+ * the buffer into which the data is read.
+ * @param off
+ * the start offset in the destination array <code>b</code>
+ * @param len
+ * the maximum number of bytes read.
+ * @return the total number of bytes read into the buffer, or
+ * <code>-1</code> if there is no more data because the end of the
+ * stream has been reached.
+ * @exception NullPointerException
+ * If <code>b</code> is <code>null</code>.
+ * @exception IndexOutOfBoundsException
+ * If <code>off</code> is negative, <code>len</code> is
+ * negative, or <code>len</code> is greater than
+ * <code>b.length - off</code>
+ */
+ public final int read(byte b[], int off, int len)
+ {
+ if (b == null)
+ {
+ throw new NullPointerException();
+ }
+ else if (off < 0 || len < 0 || len > b.length - off)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ if (pos >= count)
+ {
+ return -1;
+ }
+ if (pos + len > count)
+ {
+ len = count - pos;
+ }
+ if (len <= 0)
+ {
+ return 0;
+ }
+ System.arraycopy(buf, pos, b, off, len);
+ pos += len;
+ return len;
+ }
+
+ /**
+ * Skips <code>n</code> bytes of input from this input stream. Fewer bytes
+ * might be skipped if the end of the input stream is reached. The actual
+ * number <code>k</code> of bytes to be skipped is equal to the smaller of
+ * <code>n</code> and <code>count-pos</code>. The value <code>k</code>
+ * is added into <code>pos</code> and <code>k</code> is returned.
+ *
+ * @param n
+ * the number of bytes to be skipped.
+ * @return the actual number of bytes skipped.
+ */
+ public final long skip(long n)
+ {
+ if (pos + n > count)
+ {
+ n = count - pos;
+ }
+ if (n < 0)
+ {
+ return 0;
+ }
+ pos += n;
+ return n;
+ }
+
+ /**
+ * Returns the number of remaining bytes that can be read (or skipped over)
+ * from this input stream.
+ * <p>
+ * The value returned is <code>count - pos</code>, which is the
+ * number of bytes remaining to be read from the input buffer.
+ *
+ * @return the number of remaining bytes that can be read (or skipped over)
+ * from this input stream without blocking.
+ */
+ public final int available()
+ {
+ return count - pos;
+ }
+
+ /**
+ * Tests if this <code>InputStream</code> supports mark/reset. The
+ * <code>markSupported</code> method of <code>ByteArrayInputStream</code>
+ * always returns <code>true</code>.
+ *
+ * @since JDK1.1
+ */
+ public final boolean markSupported()
+ {
+ return true;
+ }
+
+ /**
+ * Set the current marked position in the stream. ByteArrayInputStream
+ * objects are marked at position zero by default when constructed. They may
+ * be marked at another position within the buffer by this method.
+ * <p>
+ * If no mark has been set, then the value of the mark is the offset passed
+ * to the constructor (or 0 if the offset was not supplied).
+ *
+ * <p>
+ * Note: The <code>readAheadLimit</code> for this class has no meaning.
+ *
+ * @since JDK1.1
+ */
+ public final void mark(int readAheadLimit)
+ {
+ mark = pos;
+ }
+
+ /**
+ * Resets the buffer to the marked position. The marked position is 0 unless
+ * another position was marked or an offset was specified in the
+ * constructor.
+ */
+ public final void reset()
+ {
+ pos = mark;
+ }
+
+ /**
+ * Closing a <tt>ByteArrayInputStream</tt> has no effect. The methods in
+ * this class can be called after the stream has been closed without
+ * generating an <tt>IOException</tt>.
+ * <p>
+ */
+ public final void close() throws IOException
+ {
+ }
+ }
+
+ private static class Buffer extends FastByteArrayInputStream
+ {
+ public Buffer()
+ {
+ super(new byte[] {});
+ }
+
+ public void reset(byte[] input, int start, int length)
+ {
+ this.buf = input;
+ this.count = start + length;
+ this.mark = start;
+ this.pos = start;
+ }
+
+ public int getPosition()
+ {
+ return pos;
+ }
+
+ public void setPosition(int position)
+ {
+ pos = position;
+ }
+
+ public int getLength()
+ {
+ return count;
+ }
+ }
+
+ private Buffer buffer_;
+
+ /** Constructs a new empty buffer. */
+ public DataInputBuffer()
+ {
+ this(new Buffer());
+ }
+
+ private DataInputBuffer(Buffer buffer)
+ {
+ super(buffer);
+ this.buffer_ = buffer;
+ }
+
+ /** Resets the data that the buffer reads. */
+ public void reset(byte[] input, int length)
+ {
+ buffer_.reset(input, 0, length);
+ }
+
+ /** Resets the data that the buffer reads. */
+ public void reset(byte[] input, int start, int length)
+ {
+ buffer_.reset(input, start, length);
+ }
+
+ /** Returns the current position in the input. */
+ public int getPosition()
+ {
+ return buffer_.getPosition();
+ }
+
+ /** Set the position within the input */
+ public void setPosition(int position)
+ {
+ buffer_.setPosition(position);
+ }
+
+ /** Returns the length of the input. */
+ public int getLength()
+ {
+ return buffer_.getLength();
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/DataOutputBuffer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/DataOutputBuffer.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/DataOutputBuffer.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/DataOutputBuffer.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+
+import org.apache.cassandra.continuations.Suspendable;
+
+
+/**
+ * An implementation of the DataOutputStream interface. This class is completely thread
+ * unsafe.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class DataOutputBuffer extends DataOutputStream
+{
+ /*
+ * This is a clone of the ByteArrayOutputStream but w/o the unnecessary
+ * synchronized keyword usage.
+ */
+ public static class FastByteArrayOutputStream extends OutputStream
+ {
+
+ /**
+ * The buffer where data is stored.
+ */
+ protected byte buf[];
+
+ /**
+ * The number of valid bytes in the buffer.
+ */
+ protected int count;
+
+ /**
+ * Creates a new byte array output stream. The buffer capacity is
+ * initially 32 bytes, though its size increases if necessary.
+ */
+ public FastByteArrayOutputStream()
+ {
+ this(32);
+ }
+
+ /**
+ * Creates a new byte array output stream, with a buffer capacity of the
+ * specified size, in bytes.
+ *
+ * @param size
+ * the initial size.
+ * @exception IllegalArgumentException
+ * if size is negative.
+ */
+ public FastByteArrayOutputStream(int size)
+ {
+ if (size < 0)
+ {
+ throw new IllegalArgumentException("Negative initial size: "
+ + size);
+ }
+ buf = new byte[size];
+ }
+
+ /**
+ * Writes the specified byte to this byte array output stream.
+ *
+ * @param b
+ * the byte to be written.
+ */
+ public void write(int b)
+ {
+ int newcount = count + 1;
+ if (newcount > buf.length)
+ {
+ buf = Arrays.copyOf(buf, Math.max(buf.length << 1, newcount));
+ }
+ buf[count] = (byte) b;
+ count = newcount;
+ }
+
+ /**
+ * Writes <code>len</code> bytes from the specified byte array
+ * starting at offset <code>off</code> to this byte array output
+ * stream.
+ *
+ * @param b
+ * the data.
+ * @param off
+ * the start offset in the data.
+ * @param len
+ * the number of bytes to write.
+ */
+ public void write(byte b[], int off, int len)
+ {
+ if ((off < 0) || (off > b.length) || (len < 0)
+ || ((off + len) > b.length) || ((off + len) < 0))
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ else if (len == 0)
+ {
+ return;
+ }
+ int newcount = count + len;
+ if (newcount > buf.length)
+ {
+ buf = Arrays.copyOf(buf, Math.max(buf.length << 1, newcount));
+ }
+ System.arraycopy(b, off, buf, count, len);
+ count = newcount;
+ }
+
+ /**
+ * Writes the complete contents of this byte array output stream to the
+ * specified output stream argument, as if by calling the output
+ * stream's write method using <code>out.write(buf, 0, count)</code>.
+ *
+ * @param out
+ * the output stream to which to write the data.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void writeTo(OutputStream out) throws IOException
+ {
+ out.write(buf, 0, count);
+ }
+
+ /**
+ * Resets the <code>count</code> field of this byte array output
+ * stream to zero, so that all currently accumulated output in the
+ * output stream is discarded. The output stream can be used again,
+ * reusing the already allocated buffer space.
+ *
+ * @see java.io.ByteArrayInputStream#count
+ */
+ public void reset()
+ {
+ count = 0;
+ }
+
+ /**
+ * Creates a newly allocated byte array. Its size is the current size of
+ * this output stream and the valid contents of the buffer have been
+ * copied into it.
+ *
+ * @return the current contents of this output stream, as a byte array.
+ * @see java.io.ByteArrayOutputStream#size()
+ */
+ public byte toByteArray()[]
+ {
+ return Arrays.copyOf(buf, count);
+ }
+
+ /**
+ * Returns the current size of the buffer.
+ *
+ * @return the value of the <code>count</code> field, which is the
+ * number of valid bytes in this output stream.
+ * @see java.io.ByteArrayOutputStream#count
+ */
+ public int size()
+ {
+ return count;
+ }
+
+ /**
+ * Converts the buffer's contents into a string decoding bytes using the
+ * platform's default character set. The length of the new
+ * <tt>String</tt> is a function of the character set, and hence may
+ * not be equal to the size of the buffer.
+ *
+ * <p>
+ * This method always replaces malformed-input and unmappable-character
+ * sequences with the default replacement string for the platform's
+ * default character set. The
+ * {@linkplain java.nio.charset.CharsetDecoder} class should be used
+ * when more control over the decoding process is required.
+ *
+ * @return String decoded from the buffer's contents.
+ * @since JDK1.1
+ */
+ public String toString()
+ {
+ return new String(buf, 0, count);
+ }
+
+ /**
+ * Converts the buffer's contents into a string by decoding the bytes
+ * using the specified {@link java.nio.charset.Charset charsetName}.
+ * The length of the new <tt>String</tt> is a function of the charset,
+ * and hence may not be equal to the length of the byte array.
+ *
+ * <p>
+ * This method always replaces malformed-input and unmappable-character
+ * sequences with this charset's default replacement string. The {@link
+ * java.nio.charset.CharsetDecoder} class should be used when more
+ * control over the decoding process is required.
+ *
+ * @param charsetName
+ * the name of a supported
+ * {@linkplain java.nio.charset.Charset </code>charset<code>}
+ * @return String decoded from the buffer's contents.
+ * @exception UnsupportedEncodingException
+ * If the named charset is not supported
+ * @since JDK1.1
+ */
+ public String toString(String charsetName) throws UnsupportedEncodingException
+ {
+ return new String(buf, 0, count, charsetName);
+ }
+
+ /**
+ * Creates a newly allocated string. Its size is the current size of the
+ * output stream and the valid contents of the buffer have been copied
+ * into it. Each character <i>c</i> in the resulting string is
+ * constructed from the corresponding element <i>b</i> in the byte
+ * array such that: <blockquote>
+ *
+ * <pre>
+ * c == (char) (((hibyte & 0xff) << 8) | (b & 0xff))
+ * </pre>
+ *
+ * </blockquote>
+ *
+ * @deprecated This method does not properly convert bytes into
+ * characters. As of JDK 1.1, the preferred way to do
+ * this is via the <code>toString(String enc)</code>
+ * method, which takes an encoding-name argument, or the
+ * <code>toString()</code> method, which uses the
+ * platform's default character encoding.
+ *
+ * @param hibyte
+ * the high byte of each resulting Unicode character.
+ * @return the current contents of the output stream, as a string.
+ * @see java.io.ByteArrayOutputStream#size()
+ * @see java.io.ByteArrayOutputStream#toString(String)
+ * @see java.io.ByteArrayOutputStream#toString()
+ */
+ @Deprecated
+ public String toString(int hibyte)
+ {
+ return new String(buf, hibyte, 0, count);
+ }
+
+ /**
+ * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods
+ * in this class can be called after the stream has been closed without
+ * generating an <tt>IOException</tt>.
+ * <p>
+ *
+ */
+ public void close() throws IOException
+ {
+ }
+
+ }
+
+ private static class Buffer extends FastByteArrayOutputStream
+ {
+ public byte[] getData()
+ {
+ return buf;
+ }
+
+ public int getLength()
+ {
+ return count;
+ }
+
+ public void reset()
+ {
+ count = 0;
+ }
+
+ public void write(DataInput in, int len) throws IOException
+ {
+ int newcount = count + len;
+ if (newcount > buf.length)
+ {
+ byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+ System.arraycopy(buf, 0, newbuf, 0, count);
+ buf = newbuf;
+ }
+ long start = System.currentTimeMillis();
+ in.readFully(buf, count, len);
+ count = newcount;
+ }
+
+ public void write(ByteBuffer buffer, int len) throws IOException
+ {
+ int newcount = count + len;
+ if (newcount > buf.length)
+ {
+ byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+ System.arraycopy(buf, 0, newbuf, 0, count);
+ buf = newbuf;
+ }
+ long start = System.currentTimeMillis();
+ buffer.get(buf, count, len);
+ count = newcount;
+ }
+ }
+
+ private Buffer buffer;
+
+ /** Constructs a new empty buffer. */
+ public DataOutputBuffer()
+ {
+ this(new Buffer());
+ }
+
+ private DataOutputBuffer(Buffer buffer)
+ {
+ super(buffer);
+ this.buffer = buffer;
+ }
+
+ /**
+ * Returns the current contents of the buffer. Data is only valid to
+ * {@link #getLength()}.
+ */
+ public byte[] getData()
+ {
+ return buffer.getData();
+ }
+
+ /** Returns the length of the valid data currently in the buffer. */
+ public int getLength()
+ {
+ return buffer.getLength();
+ }
+
+ /** Resets the buffer to empty. */
+ public DataOutputBuffer reset()
+ {
+ this.written = 0;
+ buffer.reset();
+ return this;
+ }
+
+ /** Writes bytes from a DataInput directly into the buffer. */
+ public void write(DataInput in, int length) throws IOException
+ {
+ buffer.write(in, length);
+ }
+
+ /** Writes bytes from a ByteBuffer directly into the buffer. */
+ public void write(ByteBuffer in, int length) throws IOException
+ {
+ buffer.write(in, length);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/FastBufferedInputStream.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/FastBufferedInputStream.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/FastBufferedInputStream.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/FastBufferedInputStream.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * A <code>BufferedInputStream</code> adds functionality to another input
+ * stream-namely, the ability to buffer the input and to support the
+ * <code>mark</code> and <code>reset</code> methods. When the
+ * <code>BufferedInputStream</code> is created, an internal buffer array is
+ * created. As bytes from the stream are read or skipped, the internal buffer is
+ * refilled as necessary from the contained input stream, many bytes at a time.
+ * The <code>mark</code> operation remembers a point in the input stream and
+ * the <code>reset</code> operation causes all the bytes read since the most
+ * recent <code>mark</code> operation to be reread before new bytes are taken
+ * from the contained input stream.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class FastBufferedInputStream extends FilterInputStream
+{
+
+ private static int defaultBufferSize = 8192;
+
+ /**
+ * The internal buffer array where the data is stored. When necessary, it
+ * may be replaced by another array of a different size.
+ */
+ protected volatile byte buf[];
+
+ /**
+ * Atomic updater to provide compareAndSet for buf. This is necessary
+ * because closes can be asynchronous. We use nullness of buf[] as primary
+ * indicator that this stream is closed. (The "in" field is also nulled out
+ * on close.)
+ */
+ private static final AtomicReferenceFieldUpdater<FastBufferedInputStream, byte[]> bufUpdater = AtomicReferenceFieldUpdater
+ .newUpdater(FastBufferedInputStream.class, byte[].class, "buf");
+
+ /**
+ * The index one greater than the index of the last valid byte in the
+ * buffer. This value is always in the range <code>0</code> through
+ * <code>buf.length</code>; elements <code>buf[0]</code> through
+ * <code>buf[count-1]
+ * </code>contain buffered input data obtained from the
+ * underlying input stream.
+ */
+ protected int count;
+
+ /**
+ * The current position in the buffer. This is the index of the next
+ * character to be read from the <code>buf</code> array.
+ * <p>
+ * This value is always in the range <code>0</code> through
+ * <code>count</code>. If it is less than <code>count</code>, then
+ * <code>buf[pos]</code> is the next byte to be supplied as input; if it
+ * is equal to <code>count</code>, then the next <code>read</code> or
+ * <code>skip</code> operation will require more bytes to be read from the
+ * contained input stream.
+ *
+ * @see java.io.BufferedInputStream#buf
+ */
+ protected int pos;
+
+ /**
+ * The value of the <code>pos</code> field at the time the last
+ * <code>mark</code> method was called.
+ * <p>
+ * This value is always in the range <code>-1</code> through
+ * <code>pos</code>. If there is no marked position in the input stream,
+ * this field is <code>-1</code>. If there is a marked position in the
+ * input stream, then <code>buf[markpos]</code> is the first byte to be
+ * supplied as input after a <code>reset</code> operation. If
+ * <code>markpos</code> is not <code>-1</code>, then all bytes from
+ * positions <code>buf[markpos]</code> through <code>buf[pos-1]</code>
+ * must remain in the buffer array (though they may be moved to another
+ * place in the buffer array, with suitable adjustments to the values of
+ * <code>count</code>, <code>pos</code>, and <code>markpos</code>);
+ * they may not be discarded unless and until the difference between
+ * <code>pos</code> and <code>markpos</code> exceeds
+ * <code>marklimit</code>.
+ *
+ * @see java.io.BufferedInputStream#mark(int)
+ * @see java.io.BufferedInputStream#pos
+ */
+ protected int markpos = -1;
+
+ /**
+ * The maximum read ahead allowed after a call to the <code>mark</code>
+ * method before subsequent calls to the <code>reset</code> method fail.
+ * Whenever the difference between <code>pos</code> and
+ * <code>markpos</code> exceeds <code>marklimit</code>, then the mark
+ * may be dropped by setting <code>markpos</code> to <code>-1</code>.
+ *
+ * @see java.io.BufferedInputStream#mark(int)
+ * @see java.io.BufferedInputStream#reset()
+ */
+ protected int marklimit;
+
+ /**
+ * Check to make sure that underlying input stream has not been nulled out
+ * due to close; if not return it;
+ */
+ private InputStream getInIfOpen() throws IOException
+ {
+ InputStream input = in;
+ if (input == null)
+ throw new IOException("Stream closed");
+ return input;
+ }
+
+ /**
+ * Check to make sure that buffer has not been nulled out due to close; if
+ * not return it;
+ */
+ private byte[] getBufIfOpen() throws IOException
+ {
+ byte[] buffer = buf;
+ if (buffer == null)
+ throw new IOException("Stream closed");
+ return buffer;
+ }
+
+ /**
+ * Creates a <code>BufferedInputStream</code> and saves its argument, the
+ * input stream <code>in</code>, for later use. An internal buffer array
+ * is created and stored in <code>buf</code>.
+ *
+ * @param in
+ * the underlying input stream.
+ */
+ public FastBufferedInputStream(InputStream in)
+ {
+ this(in, defaultBufferSize);
+ }
+
+ /**
+ * Creates a <code>BufferedInputStream</code> with the specified buffer
+ * size, and saves its argument, the input stream <code>in</code>, for
+ * later use. An internal buffer array of length <code>size</code> is
+ * created and stored in <code>buf</code>.
+ *
+ * @param in
+ * the underlying input stream.
+ * @param size
+ * the buffer size.
+ * @exception IllegalArgumentException
+ * if size <= 0.
+ */
+ public FastBufferedInputStream(InputStream in, int size)
+ {
+ super(in);
+ if (size <= 0)
+ {
+ throw new IllegalArgumentException("Buffer size <= 0");
+ }
+ buf = new byte[size];
+ }
+
+ /**
+ * Fills the buffer with more data, taking into account shuffling and other
+ * tricks for dealing with marks. Assumes that it is being called by a
+ * synchronized method. This method also assumes that all data has already
+ * been read in, hence pos > count.
+ */
+ private void fill() throws IOException
+ {
+ byte[] buffer = getBufIfOpen();
+ if (markpos < 0)
+ pos = 0; /* no mark: throw away the buffer */
+ else if (pos >= buffer.length) /* no room left in buffer */
+ if (markpos > 0)
+ { /* can throw away early part of the buffer */
+ int sz = pos - markpos;
+ System.arraycopy(buffer, markpos, buffer, 0, sz);
+ pos = sz;
+ markpos = 0;
+ }
+ else if (buffer.length >= marklimit)
+ {
+ markpos = -1; /* buffer got too big, invalidate mark */
+ pos = 0; /* drop buffer contents */
+ }
+ else
+ { /* grow buffer */
+ int nsz = pos * 2;
+ if (nsz > marklimit)
+ nsz = marklimit;
+ byte nbuf[] = new byte[nsz];
+ System.arraycopy(buffer, 0, nbuf, 0, pos);
+ if (!bufUpdater.compareAndSet(this, buffer, nbuf))
+ {
+ // Can't replace buf if there was an async close.
+ // Note: This would need to be changed if fill()
+ // is ever made accessible to multiple threads.
+ // But for now, the only way CAS can fail is via close.
+ // assert buf == null;
+ throw new IOException("Stream closed");
+ }
+ buffer = nbuf;
+ }
+ count = pos;
+ int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
+ if (n > 0)
+ count = n + pos;
+ }
+
+ /**
+ * See the general contract of the <code>read</code> method of
+ * <code>InputStream</code>.
+ *
+ * @return the next byte of data, or <code>-1</code> if the end of the
+ * stream is reached.
+ * @exception IOException
+ * if this input stream has been closed by invoking its
+ * {@link #close()} method, or an I/O error occurs.
+ * @see java.io.FilterInputStream#in
+ */
+ public int read() throws IOException
+ {
+ if (pos >= count)
+ {
+ fill();
+ if (pos >= count)
+ return -1;
+ }
+ return getBufIfOpen()[pos++] & 0xff;
+ }
+
+ /**
+ * Read characters into a portion of an array, reading from the underlying
+ * stream at most once if necessary.
+ */
+ private int read1(byte[] b, int off, int len) throws IOException
+ {
+ int avail = count - pos;
+ if (avail <= 0)
+ {
+ /*
+ * If the requested length is at least as large as the buffer, and
+ * if there is no mark/reset activity, do not bother to copy the
+ * bytes into the local buffer. In this way buffered streams will
+ * cascade harmlessly.
+ */
+ if (len >= getBufIfOpen().length && markpos < 0)
+ {
+ return getInIfOpen().read(b, off, len);
+ }
+ fill();
+ avail = count - pos;
+ if (avail <= 0)
+ return -1;
+ }
+ int cnt = (avail < len) ? avail : len;
+ System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
+ pos += cnt;
+ return cnt;
+ }
+
+ /**
+ * Reads bytes from this byte-input stream into the specified byte array,
+ * starting at the given offset.
+ *
+ * <p>
+ * This method implements the general contract of the corresponding
+ * <code>{@link InputStream#read(byte[], int, int) read}</code> method of
+ * the <code>{@link InputStream}</code> class. As an additional
+ * convenience, it attempts to read as many bytes as possible by repeatedly
+ * invoking the <code>read</code> method of the underlying stream. This
+ * iterated <code>read</code> continues until one of the following
+ * conditions becomes true:
+ * <ul>
+ *
+ * <li> The specified number of bytes have been read,
+ *
+ * <li> The <code>read</code> method of the underlying stream returns
+ * <code>-1</code>, indicating end-of-file, or
+ *
+ * <li> The <code>available</code> method of the underlying stream returns
+ * zero, indicating that further input requests would block.
+ *
+ * </ul>
+ * If the first <code>read</code> on the underlying stream returns
+ * <code>-1</code> to indicate end-of-file then this method returns
+ * <code>-1</code>. Otherwise this method returns the number of bytes
+ * actually read.
+ *
+ * <p>
+ * Subclasses of this class are encouraged, but not required, to attempt to
+ * read as many bytes as possible in the same fashion.
+ *
+ * @param b
+ * destination buffer.
+ * @param off
+ * offset at which to start storing bytes.
+ * @param len
+ * maximum number of bytes to read.
+ * @return the number of bytes read, or <code>-1</code> if the end of the
+ * stream has been reached.
+ * @exception IOException
+ * if this input stream has been closed by invoking its
+ * {@link #close()} method, or an I/O error occurs.
+ */
+ public int read(byte b[], int off, int len) throws IOException
+ {
+ getBufIfOpen(); // Check for closed stream
+ if ((off | len | (off + len) | (b.length - (off + len))) < 0)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ else if (len == 0)
+ {
+ return 0;
+ }
+
+ int n = 0;
+ for (;;)
+ {
+ int nread = read1(b, off + n, len - n);
+ if (nread <= 0)
+ return (n == 0) ? nread : n;
+ n += nread;
+ if (n >= len)
+ return n;
+ // if not closed but no bytes available, return
+ InputStream input = in;
+ if (input != null && input.available() <= 0)
+ return n;
+ }
+ }
+
+ /**
+ * See the general contract of the <code>skip</code> method of
+ * <code>InputStream</code>.
+ *
+ * @exception IOException
+ * if the stream does not support seek, or if this input
+ * stream has been closed by invoking its {@link #close()}
+ * method, or an I/O error occurs.
+ */
+ public long skip(long n) throws IOException
+ {
+ getBufIfOpen(); // Check for closed stream
+ if (n <= 0)
+ {
+ return 0;
+ }
+ long avail = count - pos;
+
+ if (avail <= 0)
+ {
+ // If no mark position set then don't keep in buffer
+ if (markpos < 0)
+ return getInIfOpen().skip(n);
+
+ // Fill in buffer to save bytes for reset
+ fill();
+ avail = count - pos;
+ if (avail <= 0)
+ return 0;
+ }
+
+ long skipped = (avail < n) ? avail : n;
+ pos += skipped;
+ return skipped;
+ }
+
+ /**
+ * Returns an estimate of the number of bytes that can be read (or skipped
+ * over) from this input stream without blocking by the next invocation of a
+ * method for this input stream. The next invocation might be the same
+ * thread or another thread. A single read or skip of this many bytes will
+ * not block, but may read or skip fewer bytes.
+ * <p>
+ * This method returns the sum of the number of bytes remaining to be read
+ * in the buffer (<code>count - pos</code>) and the result of
+ * calling the {@link java.io.FilterInputStream#in in}.available().
+ *
+ * @return an estimate of the number of bytes that can be read (or skipped
+ * over) from this input stream without blocking.
+ * @exception IOException
+ * if this input stream has been closed by invoking its
+ * {@link #close()} method, or an I/O error occurs.
+ */
+ public int available() throws IOException
+ {
+ return getInIfOpen().available() + (count - pos);
+ }
+
+ /**
+ * See the general contract of the <code>mark</code> method of
+ * <code>InputStream</code>.
+ *
+ * @param readlimit
+ * the maximum limit of bytes that can be read before the mark
+ * position becomes invalid.
+ * @see java.io.BufferedInputStream#reset()
+ */
+ public void mark(int readlimit)
+ {
+ marklimit = readlimit;
+ markpos = pos;
+ }
+
+ /**
+ * See the general contract of the <code>reset</code> method of
+ * <code>InputStream</code>.
+ * <p>
+ * If <code>markpos</code> is <code>-1</code> (no mark has been set or
+ * the mark has been invalidated), an <code>IOException</code> is thrown.
+ * Otherwise, <code>pos</code> is set equal to <code>markpos</code>.
+ *
+ * @exception IOException
+ * if this stream has not been marked or, if the mark has
+ * been invalidated, or the stream has been closed by
+ * invoking its {@link #close()} method, or an I/O error
+ * occurs.
+ * @see java.io.BufferedInputStream#mark(int)
+ */
+ public void reset() throws IOException
+ {
+ getBufIfOpen(); // Cause exception if closed
+ if (markpos < 0)
+ throw new IOException("Resetting to invalid mark");
+ pos = markpos;
+ }
+
+ /**
+ * Tests if this input stream supports the <code>mark</code> and
+ * <code>reset</code> methods. The <code>markSupported</code> method of
+ * <code>BufferedInputStream</code> returns <code>true</code>.
+ *
+ * @return a <code>boolean</code> indicating if this stream type supports
+ * the <code>mark</code> and <code>reset</code> methods.
+ * @see java.io.InputStream#mark(int)
+ * @see java.io.InputStream#reset()
+ */
+ public boolean markSupported()
+ {
+ return true;
+ }
+
+ /**
+ * Closes this input stream and releases any system resources associated
+ * with the stream. Once the stream has been closed, further read(),
+ * available(), reset(), or skip() invocations will throw an IOException.
+ * Closing a previously closed stream has no effect.
+ *
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void close() throws IOException
+ {
+ byte[] buffer;
+ while ((buffer = buf) != null)
+ {
+ if (bufUpdater.compareAndSet(this, buffer, null))
+ {
+ InputStream input = in;
+ in = null;
+ if (input != null)
+ input.close();
+ return;
+ }
+ // Else retry in case a new buf was CASed in fill()
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/FastBufferedOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/FastBufferedOutputStream.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/FastBufferedOutputStream.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/FastBufferedOutputStream.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+
+/**
+ * The class implements a buffered output stream. By setting up such an output
+ * stream, an application can write bytes to the underlying output stream
+ * without necessarily causing a call to the underlying system for each byte
+ * written.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class FastBufferedOutputStream extends FilterOutputStream
+{
+ /**
+ * The internal buffer where data is stored.
+ */
+ protected byte buf[];
+
+ /**
+ * The number of valid bytes in the buffer. This value is always in the
+ * range <tt>0</tt> through <tt>buf.length</tt>; elements
+ * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid byte
+ * data.
+ */
+ protected int count;
+
+ /**
+ * Creates a new buffered output stream to write data to the specified
+ * underlying output stream.
+ *
+ * @param out
+ * the underlying output stream.
+ */
+ public FastBufferedOutputStream(OutputStream out)
+ {
+ this(out, 8192);
+ }
+
+ /**
+ * Creates a new buffered output stream to write data to the specified
+ * underlying output stream with the specified buffer size.
+ *
+ * @param out
+ * the underlying output stream.
+ * @param size
+ * the buffer size.
+ * @exception IllegalArgumentException
+ * if size <= 0.
+ */
+ public FastBufferedOutputStream(OutputStream out, int size)
+ {
+ super(out);
+ if (size <= 0)
+ {
+ throw new IllegalArgumentException("Buffer size <= 0");
+ }
+ buf = new byte[size];
+ }
+
+ /** Flush the internal buffer */
+ private void flushBuffer() throws IOException
+ {
+ if (count > 0)
+ {
+ out.write(buf, 0, count);
+ count = 0;
+ }
+ }
+
+ /**
+ * Writes the specified byte to this buffered output stream.
+ *
+ * @param b
+ * the byte to be written.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void write(int b) throws IOException
+ {
+ if (count >= buf.length)
+ {
+ flushBuffer();
+ }
+ buf[count++] = (byte) b;
+ }
+
+ /**
+ * Writes <code>len</code> bytes from the specified byte array starting at
+ * offset <code>off</code> to this buffered output stream.
+ *
+ * <p>
+ * Ordinarily this method stores bytes from the given array into this
+ * stream's buffer, flushing the buffer to the underlying output stream as
+ * needed. If the requested length is at least as large as this stream's
+ * buffer, however, then this method will flush the buffer and write the
+ * bytes directly to the underlying output stream. Thus redundant
+ * <code>BufferedOutputStream</code>s will not copy data unnecessarily.
+ *
+ * @param b
+ * the data.
+ * @param off
+ * the start offset in the data.
+ * @param len
+ * the number of bytes to write.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void write(byte b[], int off, int len)
+ throws IOException
+ {
+ if (len >= buf.length)
+ {
+ /*
+ * If the request length exceeds the size of the output buffer,
+ * flush the output buffer and then write the data directly. In this
+ * way buffered streams will cascade harmlessly.
+ */
+ flushBuffer();
+ out.write(b, off, len);
+ return;
+ }
+ if (len > buf.length - count)
+ {
+ flushBuffer();
+ }
+ System.arraycopy(b, off, buf, count, len);
+ count += len;
+ }
+
+ /**
+ * Flushes this buffered output stream. This forces any buffered output
+ * bytes to be written out to the underlying output stream.
+ *
+ * @exception IOException
+ * if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ public void flush() throws IOException
+ {
+ flushBuffer();
+ out.flush();
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/ICompactSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/ICompactSerializer.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/ICompactSerializer.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/ICompactSerializer.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.DataInputStream;
+
+/**
+ * Allows for the controlled serialization/deserialization of a given type.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface ICompactSerializer<T>
+{
+ /**
+ * Serialize the specified type into the specified DataOutputStream instance.
+ * @param t type that needs to be serialized
+ * @param dos DataOutput into which serialization needs to happen.
+ * @throws IOException
+ */
+ public void serialize(T t, DataOutputStream dos) throws IOException;
+
+ /**
+ * Deserialize into the specified DataInputStream instance.
+ * @param dis DataInput from which deserialization needs to happen.
+ * @throws IOException
+ * @return the type that was deserialized
+ */
+ public T deserialize(DataInputStream dis) throws IOException;
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileReader.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.List;
+
+/**
+ * Interface to read from the SequenceFile abstraction.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFileReader
+{
+ public String getFileName();
+ public long getEOF() throws IOException;
+ public long getCurrentPosition() throws IOException;
+ public boolean isHealthyFileDescriptor() throws IOException;
+ public void seek(long position) throws IOException;
+ public boolean isEOF() throws IOException;
+
+ /**
+ * Be extremely careful while using this API. This currently
+ * used to read the commit log header from the commit logs.
+ * Treat this as an internal API.
+ *
+ * @param bytes read into this byte array.
+ */
+ public void readDirect(byte[] bytes) throws IOException;
+
+ /**
+ * Read a long value from the underlying sub system.
+ * @return value read
+ * @throws IOException
+ */
+ public long readLong() throws IOException;
+
+ /**
+ * This functions is used to help out subsequent reads
+ * on the specified key. It reads the keys prior to this
+ * one on disk so that the buffer cache is hot.
+ *
+ * @param key key for which we are performing the touch.
+ * @param fData true implies we fetch the data into buffer cache.
+ */
+ public long touch(String key , boolean fData) throws IOException;
+
+ /**
+ * This method helps is retrieving the offset of the specified
+ * key in the file using the block index.
+ *
+ * @param key key whose position we need in the block index.
+ */
+ public long getPositionFromBlockIndex(String key) throws IOException;
+
+ /**
+ * This method returns the position of the specified key and the
+ * size of its data segment from the block index.
+ *
+ * @param key key whose block metadata we are interested in.
+ * @return an instance of the block metadata for this key.
+ */
+ public SSTable.BlockMetadata getBlockMetadata(String key) throws IOException;
+
+ /**
+ * This method dumps the next key/value into the DataOuputStream
+ * passed in.
+ *
+ * @param dos - DataOutputStream that needs to be filled.
+ * @return number of bytes read.
+ * @throws IOException
+ */
+ public long next(DataOutputBuffer bufOut) throws IOException;
+
+ /**
+ * This method dumps the next key/value into the DataOuputStream
+ * passed in.
+ *
+ * @param key key we are interested in.
+ * @param dos DataOutputStream that needs to be filled.
+ * @param section region of the file that needs to be read
+ * @throws IOException
+ * @return the number of bytes read.
+ */
+ public long next(String key, DataOutputBuffer bufOut, Coordinate section) throws IOException;
+
+ /**
+ * This method dumps the next key/value into the DataOuputStream
+ * passed in.
+ *
+ * @param key key we are interested in.
+ * @param dos DataOutputStream that needs to be filled.
+ * @param column name of the column in our format.
+ * @param section region of the file that needs to be read
+ * @throws IOException
+ * @return number of bytes that were read.
+ */
+ public long next(String key, DataOutputBuffer bufOut, String column, Coordinate section) throws IOException;
+
+ /**
+ * This method dumps the next key/value into the DataOuputStream
+ * passed in. Always use this method to query for application
+ * specific data as it will have indexes.
+ *
+ * @param key - key we are interested in.
+ * @param dos - DataOutputStream that needs to be filled.
+ * @param cfName - The name of the column family only without the ":"
+ * @param columnNames - The list of columns in the cfName column family
+ * that we want to return
+ * @param section region of the file that needs to be read
+ * @throws IOException
+ * @return number of bytes read.
+ *
+ */
+ public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, List<String> columnNames, Coordinate section) throws IOException;
+
+ /**
+ * This method dumps the next key/value into the DataOuputStream
+ * passed in.
+ *
+ * @param key key we are interested in.
+ * @param dos DataOutputStream that needs to be filled.
+ * @param column name of the column in our format.
+ * @param timeRange time range we are interested in.
+ * @param section region of the file that needs to be read
+ * @throws IOException
+ * @return number of bytes that were read.
+ */
+ public long next(String key, DataOutputBuffer bufOut, String column, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException;
+
+ /**
+ * Close the file after reading.
+ * @throws IOException
+ */
+ public void close() throws IOException;
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/IFileWriter.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.cassandra.db.PrimaryKey;
+
+
+/**
+ * An interface for writing into the SequenceFile abstraction.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFileWriter
+{
+ /**
+ * Get the current position of the file pointer.
+ * @return current file pointer position
+ * @throws IOException
+ */
+ public long getCurrentPosition() throws IOException;
+
+ /**
+ * @return the last file modification time.
+ */
+ public long lastModified();
+
+ /**
+ * Seeks the file pointer to the specified position.
+ * @param position position within the file to seek to.
+ * @throws IOException
+ */
+ public void seek(long position) throws IOException;
+
+ /**
+ * Appends the buffer to the the underlying SequenceFile.
+ * @param buffer buffer which contains the serialized data.
+ * @throws IOException
+ */
+ public void append(DataOutputBuffer buffer) throws IOException;
+
+ /**
+ * Appends the key and the value to the the underlying SequenceFile.
+ * @param keyBuffer buffer which contains the serialized key.
+ * @param buffer buffer which contains the serialized data.
+ * @throws IOException
+ */
+ public void append(DataOutputBuffer keyBuffer, DataOutputBuffer buffer) throws IOException;
+
+ /**
+ * Appends the key and the value to the the underlying SequenceFile.
+ * @param key key associated with this peice of data.
+ * @param buffer buffer containing the serialized data.
+ * @throws IOException
+ */
+ public void append(String key, DataOutputBuffer buffer) throws IOException;
+
+ /**
+ * Appends the key and the value to the the underlying SequenceFile.
+ * @param key key associated with this peice of data.
+ * @param value byte array containing the serialized data.
+ * @throws IOException
+ */
+ public void append(String key, byte[] value) throws IOException;
+
+ /**
+ * Appends the key and the long value to the the underlying SequenceFile.
+ * This is used in the contruction of the index file associated with a
+ * SSTable.
+ * @param key key associated with this peice of data.
+ * @param value value associated with this key.
+ * @throws IOException
+ */
+ public void append(String key, long value) throws IOException;
+
+ /**
+ * Be extremely careful while using this API. This currently
+ * used to write the commit log header in the commit logs.
+ * If not used carefully it could completely screw up reads
+ * of other key/value pairs that are written.
+ *
+ * @param bytes serialized version of the commit log header.
+ * @throws IOException
+ */
+ public long writeDirect(byte[] bytes) throws IOException;
+
+ /**
+ * Write a long into the underlying sub system.
+ * @param value long to be written
+ * @throws IOException
+ */
+ public void writeLong(long value) throws IOException;
+
+ /**
+ * Close the file which is being used for the write.
+ * @throws IOException
+ */
+ public void close() throws IOException;
+
+ /**
+ * Close the file after appending the passed in footer information.
+ * @param footer footer information.
+ * @param size size of the footer.
+ * @throws IOException
+ */
+ public void close(byte[] footer, int size) throws IOException;
+
+ /**
+ * @return the name of the file.
+ */
+ public String getFileName();
+
+ /**
+ * @return the size of the file.
+ * @throws IOException
+ */
+ public long getFileSize() throws IOException;
+}