You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC
svn commit: r799331 [16/29] - in /incubator/cassandra/trunk: ./
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FastBufferedInputStream.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FastBufferedInputStream.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FastBufferedInputStream.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FastBufferedInputStream.java Thu Jul 30 15:30:21 2009
@@ -1,486 +1,486 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io;
-
-import java.io.*;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-/**
- * A <code>BufferedInputStream</code> adds functionality to another input
- * stream-namely, the ability to buffer the input and to support the
- * <code>mark</code> and <code>reset</code> methods. When the
- * <code>BufferedInputStream</code> is created, an internal buffer array is
- * created. As bytes from the stream are read or skipped, the internal buffer is
- * refilled as necessary from the contained input stream, many bytes at a time.
- * The <code>mark</code> operation remembers a point in the input stream and
- * the <code>reset</code> operation causes all the bytes read since the most
- * recent <code>mark</code> operation to be reread before new bytes are taken
- * from the contained input stream.
- *
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-public class FastBufferedInputStream extends FilterInputStream
-{
-
- private static int defaultBufferSize = 8192;
-
- /**
- * The internal buffer array where the data is stored. When necessary, it
- * may be replaced by another array of a different size.
- */
- protected volatile byte buf[];
-
- /**
- * Atomic updater to provide compareAndSet for buf. This is necessary
- * because closes can be asynchronous. We use nullness of buf[] as primary
- * indicator that this stream is closed. (The "in" field is also nulled out
- * on close.)
- */
- private static final AtomicReferenceFieldUpdater<FastBufferedInputStream, byte[]> bufUpdater = AtomicReferenceFieldUpdater
- .newUpdater(FastBufferedInputStream.class, byte[].class, "buf");
-
- /**
- * The index one greater than the index of the last valid byte in the
- * buffer. This value is always in the range <code>0</code> through
- * <code>buf.length</code>; elements <code>buf[0]</code> through
- * <code>buf[count-1]
- * </code>contain buffered input data obtained from the
- * underlying input stream.
- */
- protected int count;
-
- /**
- * The current position in the buffer. This is the index of the next
- * character to be read from the <code>buf</code> array.
- * <p>
- * This value is always in the range <code>0</code> through
- * <code>count</code>. If it is less than <code>count</code>, then
- * <code>buf[pos]</code> is the next byte to be supplied as input; if it
- * is equal to <code>count</code>, then the next <code>read</code> or
- * <code>skip</code> operation will require more bytes to be read from the
- * contained input stream.
- *
- * @see java.io.BufferedInputStream#buf
- */
- protected int pos;
-
- /**
- * The value of the <code>pos</code> field at the time the last
- * <code>mark</code> method was called.
- * <p>
- * This value is always in the range <code>-1</code> through
- * <code>pos</code>. If there is no marked position in the input stream,
- * this field is <code>-1</code>. If there is a marked position in the
- * input stream, then <code>buf[markpos]</code> is the first byte to be
- * supplied as input after a <code>reset</code> operation. If
- * <code>markpos</code> is not <code>-1</code>, then all bytes from
- * positions <code>buf[markpos]</code> through <code>buf[pos-1]</code>
- * must remain in the buffer array (though they may be moved to another
- * place in the buffer array, with suitable adjustments to the values of
- * <code>count</code>, <code>pos</code>, and <code>markpos</code>);
- * they may not be discarded unless and until the difference between
- * <code>pos</code> and <code>markpos</code> exceeds
- * <code>marklimit</code>.
- *
- * @see java.io.BufferedInputStream#mark(int)
- * @see java.io.BufferedInputStream#pos
- */
- protected int markpos = -1;
-
- /**
- * The maximum read ahead allowed after a call to the <code>mark</code>
- * method before subsequent calls to the <code>reset</code> method fail.
- * Whenever the difference between <code>pos</code> and
- * <code>markpos</code> exceeds <code>marklimit</code>, then the mark
- * may be dropped by setting <code>markpos</code> to <code>-1</code>.
- *
- * @see java.io.BufferedInputStream#mark(int)
- * @see java.io.BufferedInputStream#reset()
- */
- protected int marklimit;
-
- /**
- * Check to make sure that underlying input stream has not been nulled out
- * due to close; if not return it;
- */
- private InputStream getInIfOpen() throws IOException
- {
- InputStream input = in;
- if (input == null)
- throw new IOException("Stream closed");
- return input;
- }
-
- /**
- * Check to make sure that buffer has not been nulled out due to close; if
- * not return it;
- */
- private byte[] getBufIfOpen() throws IOException
- {
- byte[] buffer = buf;
- if (buffer == null)
- throw new IOException("Stream closed");
- return buffer;
- }
-
- /**
- * Creates a <code>BufferedInputStream</code> and saves its argument, the
- * input stream <code>in</code>, for later use. An internal buffer array
- * is created and stored in <code>buf</code>.
- *
- * @param in
- * the underlying input stream.
- */
- public FastBufferedInputStream(InputStream in)
- {
- this(in, defaultBufferSize);
- }
-
- /**
- * Creates a <code>BufferedInputStream</code> with the specified buffer
- * size, and saves its argument, the input stream <code>in</code>, for
- * later use. An internal buffer array of length <code>size</code> is
- * created and stored in <code>buf</code>.
- *
- * @param in
- * the underlying input stream.
- * @param size
- * the buffer size.
- * @exception IllegalArgumentException
- * if size <= 0.
- */
- public FastBufferedInputStream(InputStream in, int size)
- {
- super(in);
- if (size <= 0)
- {
- throw new IllegalArgumentException("Buffer size <= 0");
- }
- buf = new byte[size];
- }
-
- /**
- * Fills the buffer with more data, taking into account shuffling and other
- * tricks for dealing with marks. Assumes that it is being called by a
- * synchronized method. This method also assumes that all data has already
- * been read in, hence pos > count.
- */
- private void fill() throws IOException
- {
- byte[] buffer = getBufIfOpen();
- if (markpos < 0)
- pos = 0; /* no mark: throw away the buffer */
- else if (pos >= buffer.length) /* no room left in buffer */
- if (markpos > 0)
- { /* can throw away early part of the buffer */
- int sz = pos - markpos;
- System.arraycopy(buffer, markpos, buffer, 0, sz);
- pos = sz;
- markpos = 0;
- }
- else if (buffer.length >= marklimit)
- {
- markpos = -1; /* buffer got too big, invalidate mark */
- pos = 0; /* drop buffer contents */
- }
- else
- { /* grow buffer */
- int nsz = pos * 2;
- if (nsz > marklimit)
- nsz = marklimit;
- byte nbuf[] = new byte[nsz];
- System.arraycopy(buffer, 0, nbuf, 0, pos);
- if (!bufUpdater.compareAndSet(this, buffer, nbuf))
- {
- // Can't replace buf if there was an async close.
- // Note: This would need to be changed if fill()
- // is ever made accessible to multiple threads.
- // But for now, the only way CAS can fail is via close.
- // assert buf == null;
- throw new IOException("Stream closed");
- }
- buffer = nbuf;
- }
- count = pos;
- int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
- if (n > 0)
- count = n + pos;
- }
-
- /**
- * See the general contract of the <code>read</code> method of
- * <code>InputStream</code>.
- *
- * @return the next byte of data, or <code>-1</code> if the end of the
- * stream is reached.
- * @exception IOException
- * if this input stream has been closed by invoking its
- * {@link #close()} method, or an I/O error occurs.
- * @see java.io.FilterInputStream#in
- */
- public int read() throws IOException
- {
- if (pos >= count)
- {
- fill();
- if (pos >= count)
- return -1;
- }
- return getBufIfOpen()[pos++] & 0xff;
- }
-
- /**
- * Read characters into a portion of an array, reading from the underlying
- * stream at most once if necessary.
- */
- private int read1(byte[] b, int off, int len) throws IOException
- {
- int avail = count - pos;
- if (avail <= 0)
- {
- /*
- * If the requested length is at least as large as the buffer, and
- * if there is no mark/reset activity, do not bother to copy the
- * bytes into the local buffer. In this way buffered streams will
- * cascade harmlessly.
- */
- if (len >= getBufIfOpen().length && markpos < 0)
- {
- return getInIfOpen().read(b, off, len);
- }
- fill();
- avail = count - pos;
- if (avail <= 0)
- return -1;
- }
- int cnt = (avail < len) ? avail : len;
- System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
- pos += cnt;
- return cnt;
- }
-
- /**
- * Reads bytes from this byte-input stream into the specified byte array,
- * starting at the given offset.
- *
- * <p>
- * This method implements the general contract of the corresponding
- * <code>{@link InputStream#read(byte[], int, int) read}</code> method of
- * the <code>{@link InputStream}</code> class. As an additional
- * convenience, it attempts to read as many bytes as possible by repeatedly
- * invoking the <code>read</code> method of the underlying stream. This
- * iterated <code>read</code> continues until one of the following
- * conditions becomes true:
- * <ul>
- *
- * <li> The specified number of bytes have been read,
- *
- * <li> The <code>read</code> method of the underlying stream returns
- * <code>-1</code>, indicating end-of-file, or
- *
- * <li> The <code>available</code> method of the underlying stream returns
- * zero, indicating that further input requests would block.
- *
- * </ul>
- * If the first <code>read</code> on the underlying stream returns
- * <code>-1</code> to indicate end-of-file then this method returns
- * <code>-1</code>. Otherwise this method returns the number of bytes
- * actually read.
- *
- * <p>
- * Subclasses of this class are encouraged, but not required, to attempt to
- * read as many bytes as possible in the same fashion.
- *
- * @param b
- * destination buffer.
- * @param off
- * offset at which to start storing bytes.
- * @param len
- * maximum number of bytes to read.
- * @return the number of bytes read, or <code>-1</code> if the end of the
- * stream has been reached.
- * @exception IOException
- * if this input stream has been closed by invoking its
- * {@link #close()} method, or an I/O error occurs.
- */
- public int read(byte b[], int off, int len) throws IOException
- {
- getBufIfOpen(); // Check for closed stream
- if ((off | len | (off + len) | (b.length - (off + len))) < 0)
- {
- throw new IndexOutOfBoundsException();
- }
- else if (len == 0)
- {
- return 0;
- }
-
- int n = 0;
- for (;;)
- {
- int nread = read1(b, off + n, len - n);
- if (nread <= 0)
- return (n == 0) ? nread : n;
- n += nread;
- if (n >= len)
- return n;
- // if not closed but no bytes available, return
- InputStream input = in;
- if (input != null && input.available() <= 0)
- return n;
- }
- }
-
- /**
- * See the general contract of the <code>skip</code> method of
- * <code>InputStream</code>.
- *
- * @exception IOException
- * if the stream does not support seek, or if this input
- * stream has been closed by invoking its {@link #close()}
- * method, or an I/O error occurs.
- */
- public long skip(long n) throws IOException
- {
- getBufIfOpen(); // Check for closed stream
- if (n <= 0)
- {
- return 0;
- }
- long avail = count - pos;
-
- if (avail <= 0)
- {
- // If no mark position set then don't keep in buffer
- if (markpos < 0)
- return getInIfOpen().skip(n);
-
- // Fill in buffer to save bytes for reset
- fill();
- avail = count - pos;
- if (avail <= 0)
- return 0;
- }
-
- long skipped = (avail < n) ? avail : n;
- pos += skipped;
- return skipped;
- }
-
- /**
- * Returns an estimate of the number of bytes that can be read (or skipped
- * over) from this input stream without blocking by the next invocation of a
- * method for this input stream. The next invocation might be the same
- * thread or another thread. A single read or skip of this many bytes will
- * not block, but may read or skip fewer bytes.
- * <p>
- * This method returns the sum of the number of bytes remaining to be read
- * in the buffer (<code>count - pos</code>) and the result of
- * calling the {@link java.io.FilterInputStream#in in}.available().
- *
- * @return an estimate of the number of bytes that can be read (or skipped
- * over) from this input stream without blocking.
- * @exception IOException
- * if this input stream has been closed by invoking its
- * {@link #close()} method, or an I/O error occurs.
- */
- public int available() throws IOException
- {
- return getInIfOpen().available() + (count - pos);
- }
-
- /**
- * See the general contract of the <code>mark</code> method of
- * <code>InputStream</code>.
- *
- * @param readlimit
- * the maximum limit of bytes that can be read before the mark
- * position becomes invalid.
- * @see java.io.BufferedInputStream#reset()
- */
- public void mark(int readlimit)
- {
- marklimit = readlimit;
- markpos = pos;
- }
-
- /**
- * See the general contract of the <code>reset</code> method of
- * <code>InputStream</code>.
- * <p>
- * If <code>markpos</code> is <code>-1</code> (no mark has been set or
- * the mark has been invalidated), an <code>IOException</code> is thrown.
- * Otherwise, <code>pos</code> is set equal to <code>markpos</code>.
- *
- * @exception IOException
- * if this stream has not been marked or, if the mark has
- * been invalidated, or the stream has been closed by
- * invoking its {@link #close()} method, or an I/O error
- * occurs.
- * @see java.io.BufferedInputStream#mark(int)
- */
- public void reset() throws IOException
- {
- getBufIfOpen(); // Cause exception if closed
- if (markpos < 0)
- throw new IOException("Resetting to invalid mark");
- pos = markpos;
- }
-
- /**
- * Tests if this input stream supports the <code>mark</code> and
- * <code>reset</code> methods. The <code>markSupported</code> method of
- * <code>BufferedInputStream</code> returns <code>true</code>.
- *
- * @return a <code>boolean</code> indicating if this stream type supports
- * the <code>mark</code> and <code>reset</code> methods.
- * @see java.io.InputStream#mark(int)
- * @see java.io.InputStream#reset()
- */
- public boolean markSupported()
- {
- return true;
- }
-
- /**
- * Closes this input stream and releases any system resources associated
- * with the stream. Once the stream has been closed, further read(),
- * available(), reset(), or skip() invocations will throw an IOException.
- * Closing a previously closed stream has no effect.
- *
- * @exception IOException
- * if an I/O error occurs.
- */
- public void close() throws IOException
- {
- byte[] buffer;
- while ((buffer = buf) != null)
- {
- if (bufUpdater.compareAndSet(this, buffer, null))
- {
- InputStream input = in;
- in = null;
- if (input != null)
- input.close();
- return;
- }
- // Else retry in case a new buf was CASed in fill()
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * A <code>BufferedInputStream</code> adds functionality to another input
+ * stream-namely, the ability to buffer the input and to support the
+ * <code>mark</code> and <code>reset</code> methods. When the
+ * <code>BufferedInputStream</code> is created, an internal buffer array is
+ * created. As bytes from the stream are read or skipped, the internal buffer is
+ * refilled as necessary from the contained input stream, many bytes at a time.
+ * The <code>mark</code> operation remembers a point in the input stream and
+ * the <code>reset</code> operation causes all the bytes read since the most
+ * recent <code>mark</code> operation to be reread before new bytes are taken
+ * from the contained input stream.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class FastBufferedInputStream extends FilterInputStream
+{
+
+ private static int defaultBufferSize = 8192;
+
+ /**
+ * The internal buffer array where the data is stored. When necessary, it
+ * may be replaced by another array of a different size.
+ */
+ protected volatile byte buf[];
+
+ /**
+ * Atomic updater to provide compareAndSet for buf. This is necessary
+ * because closes can be asynchronous. We use nullness of buf[] as primary
+ * indicator that this stream is closed. (The "in" field is also nulled out
+ * on close.)
+ */
+ private static final AtomicReferenceFieldUpdater<FastBufferedInputStream, byte[]> bufUpdater = AtomicReferenceFieldUpdater
+ .newUpdater(FastBufferedInputStream.class, byte[].class, "buf");
+
+ /**
+ * The index one greater than the index of the last valid byte in the
+ * buffer. This value is always in the range <code>0</code> through
+ * <code>buf.length</code>; elements <code>buf[0]</code> through
+ * <code>buf[count-1]
+ * </code>contain buffered input data obtained from the
+ * underlying input stream.
+ */
+ protected int count;
+
+ /**
+ * The current position in the buffer. This is the index of the next
+ * character to be read from the <code>buf</code> array.
+ * <p>
+ * This value is always in the range <code>0</code> through
+ * <code>count</code>. If it is less than <code>count</code>, then
+ * <code>buf[pos]</code> is the next byte to be supplied as input; if it
+ * is equal to <code>count</code>, then the next <code>read</code> or
+ * <code>skip</code> operation will require more bytes to be read from the
+ * contained input stream.
+ *
+ * @see java.io.BufferedInputStream#buf
+ */
+ protected int pos;
+
+ /**
+ * The value of the <code>pos</code> field at the time the last
+ * <code>mark</code> method was called.
+ * <p>
+ * This value is always in the range <code>-1</code> through
+ * <code>pos</code>. If there is no marked position in the input stream,
+ * this field is <code>-1</code>. If there is a marked position in the
+ * input stream, then <code>buf[markpos]</code> is the first byte to be
+ * supplied as input after a <code>reset</code> operation. If
+ * <code>markpos</code> is not <code>-1</code>, then all bytes from
+ * positions <code>buf[markpos]</code> through <code>buf[pos-1]</code>
+ * must remain in the buffer array (though they may be moved to another
+ * place in the buffer array, with suitable adjustments to the values of
+ * <code>count</code>, <code>pos</code>, and <code>markpos</code>);
+ * they may not be discarded unless and until the difference between
+ * <code>pos</code> and <code>markpos</code> exceeds
+ * <code>marklimit</code>.
+ *
+ * @see java.io.BufferedInputStream#mark(int)
+ * @see java.io.BufferedInputStream#pos
+ */
+ protected int markpos = -1;
+
+ /**
+ * The maximum read ahead allowed after a call to the <code>mark</code>
+ * method before subsequent calls to the <code>reset</code> method fail.
+ * Whenever the difference between <code>pos</code> and
+ * <code>markpos</code> exceeds <code>marklimit</code>, then the mark
+ * may be dropped by setting <code>markpos</code> to <code>-1</code>.
+ *
+ * @see java.io.BufferedInputStream#mark(int)
+ * @see java.io.BufferedInputStream#reset()
+ */
+ protected int marklimit;
+
+ /**
+ * Check to make sure that underlying input stream has not been nulled out
+ * due to close; if not return it;
+ */
+ private InputStream getInIfOpen() throws IOException
+ {
+ InputStream input = in;
+ if (input == null)
+ throw new IOException("Stream closed");
+ return input;
+ }
+
+ /**
+ * Check to make sure that buffer has not been nulled out due to close; if
+ * not return it;
+ */
+ private byte[] getBufIfOpen() throws IOException
+ {
+ byte[] buffer = buf;
+ if (buffer == null)
+ throw new IOException("Stream closed");
+ return buffer;
+ }
+
+ /**
+ * Creates a <code>BufferedInputStream</code> and saves its argument, the
+ * input stream <code>in</code>, for later use. An internal buffer array
+ * is created and stored in <code>buf</code>.
+ *
+ * @param in
+ * the underlying input stream.
+ */
+ public FastBufferedInputStream(InputStream in)
+ {
+ this(in, defaultBufferSize);
+ }
+
+ /**
+ * Creates a <code>BufferedInputStream</code> with the specified buffer
+ * size, and saves its argument, the input stream <code>in</code>, for
+ * later use. An internal buffer array of length <code>size</code> is
+ * created and stored in <code>buf</code>.
+ *
+ * @param in
+ * the underlying input stream.
+ * @param size
+ * the buffer size.
+ * @exception IllegalArgumentException
+ * if size <= 0.
+ */
+ public FastBufferedInputStream(InputStream in, int size)
+ {
+ super(in);
+ if (size <= 0)
+ {
+ throw new IllegalArgumentException("Buffer size <= 0");
+ }
+ buf = new byte[size];
+ }
+
+ /**
+ * Fills the buffer with more data, taking into account shuffling and other
+ * tricks for dealing with marks. Assumes that it is being called by a
+ * synchronized method. This method also assumes that all data has already
+ * been read in, hence pos > count.
+ */
+ private void fill() throws IOException
+ {
+ byte[] buffer = getBufIfOpen();
+ if (markpos < 0)
+ pos = 0; /* no mark: throw away the buffer */
+ else if (pos >= buffer.length) /* no room left in buffer */
+ if (markpos > 0)
+ { /* can throw away early part of the buffer */
+ int sz = pos - markpos;
+ System.arraycopy(buffer, markpos, buffer, 0, sz);
+ pos = sz;
+ markpos = 0;
+ }
+ else if (buffer.length >= marklimit)
+ {
+ markpos = -1; /* buffer got too big, invalidate mark */
+ pos = 0; /* drop buffer contents */
+ }
+ else
+ { /* grow buffer */
+ int nsz = pos * 2;
+ if (nsz > marklimit)
+ nsz = marklimit;
+ byte nbuf[] = new byte[nsz];
+ System.arraycopy(buffer, 0, nbuf, 0, pos);
+ if (!bufUpdater.compareAndSet(this, buffer, nbuf))
+ {
+ // Can't replace buf if there was an async close.
+ // Note: This would need to be changed if fill()
+ // is ever made accessible to multiple threads.
+ // But for now, the only way CAS can fail is via close.
+ // assert buf == null;
+ throw new IOException("Stream closed");
+ }
+ buffer = nbuf;
+ }
+ count = pos;
+ int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
+ if (n > 0)
+ count = n + pos;
+ }
+
+ /**
+ * See the general contract of the <code>read</code> method of
+ * <code>InputStream</code>.
+ *
+ * @return the next byte of data, or <code>-1</code> if the end of the
+ * stream is reached.
+ * @exception IOException
+ * if this input stream has been closed by invoking its
+ * {@link #close()} method, or an I/O error occurs.
+ * @see java.io.FilterInputStream#in
+ */
+ public int read() throws IOException
+ {
+ if (pos >= count)
+ {
+ fill();
+ if (pos >= count)
+ return -1;
+ }
+ return getBufIfOpen()[pos++] & 0xff;
+ }
+
+ /**
+ * Read characters into a portion of an array, reading from the underlying
+ * stream at most once if necessary.
+ */
+ private int read1(byte[] b, int off, int len) throws IOException
+ {
+ int avail = count - pos;
+ if (avail <= 0)
+ {
+ /*
+ * If the requested length is at least as large as the buffer, and
+ * if there is no mark/reset activity, do not bother to copy the
+ * bytes into the local buffer. In this way buffered streams will
+ * cascade harmlessly.
+ */
+ if (len >= getBufIfOpen().length && markpos < 0)
+ {
+ return getInIfOpen().read(b, off, len);
+ }
+ fill();
+ avail = count - pos;
+ if (avail <= 0)
+ return -1;
+ }
+ int cnt = (avail < len) ? avail : len;
+ System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
+ pos += cnt;
+ return cnt;
+ }
+
+ /**
+ * Reads bytes from this byte-input stream into the specified byte array,
+ * starting at the given offset.
+ *
+ * <p>
+ * This method implements the general contract of the corresponding
+ * <code>{@link InputStream#read(byte[], int, int) read}</code> method of
+ * the <code>{@link InputStream}</code> class. As an additional
+ * convenience, it attempts to read as many bytes as possible by repeatedly
+ * invoking the <code>read</code> method of the underlying stream. This
+ * iterated <code>read</code> continues until one of the following
+ * conditions becomes true:
+ * <ul>
+ *
+ * <li> The specified number of bytes have been read,
+ *
+ * <li> The <code>read</code> method of the underlying stream returns
+ * <code>-1</code>, indicating end-of-file, or
+ *
+ * <li> The <code>available</code> method of the underlying stream returns
+ * zero, indicating that further input requests would block.
+ *
+ * </ul>
+ * If the first <code>read</code> on the underlying stream returns
+ * <code>-1</code> to indicate end-of-file then this method returns
+ * <code>-1</code>. Otherwise this method returns the number of bytes
+ * actually read.
+ *
+ * <p>
+ * Subclasses of this class are encouraged, but not required, to attempt to
+ * read as many bytes as possible in the same fashion.
+ *
+ * @param b
+ * destination buffer.
+ * @param off
+ * offset at which to start storing bytes.
+ * @param len
+ * maximum number of bytes to read.
+ * @return the number of bytes read, or <code>-1</code> if the end of the
+ * stream has been reached.
+ * @exception IOException
+ * if this input stream has been closed by invoking its
+ * {@link #close()} method, or an I/O error occurs.
+ */
+ public int read(byte b[], int off, int len) throws IOException
+ {
+ getBufIfOpen(); // Check for closed stream
+ if ((off | len | (off + len) | (b.length - (off + len))) < 0)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ else if (len == 0)
+ {
+ return 0;
+ }
+
+ int n = 0;
+ for (;;)
+ {
+ int nread = read1(b, off + n, len - n);
+ if (nread <= 0)
+ return (n == 0) ? nread : n;
+ n += nread;
+ if (n >= len)
+ return n;
+ // if not closed but no bytes available, return
+ InputStream input = in;
+ if (input != null && input.available() <= 0)
+ return n;
+ }
+ }
+
+ /**
+ * See the general contract of the <code>skip</code> method of
+ * <code>InputStream</code>.
+ *
+ * @exception IOException
+ * if the stream does not support seek, or if this input
+ * stream has been closed by invoking its {@link #close()}
+ * method, or an I/O error occurs.
+ */
+ public long skip(long n) throws IOException
+ {
+ getBufIfOpen(); // Check for closed stream
+ if (n <= 0)
+ {
+ return 0;
+ }
+ long avail = count - pos;
+
+ if (avail <= 0)
+ {
+ // If no mark position set then don't keep in buffer
+ if (markpos < 0)
+ return getInIfOpen().skip(n);
+
+ // Fill in buffer to save bytes for reset
+ fill();
+ avail = count - pos;
+ if (avail <= 0)
+ return 0;
+ }
+
+ long skipped = (avail < n) ? avail : n;
+ pos += skipped;
+ return skipped;
+ }
+
+ /**
+ * Returns an estimate of the number of bytes that can be read (or skipped
+ * over) from this input stream without blocking by the next invocation of a
+ * method for this input stream. The next invocation might be the same
+ * thread or another thread. A single read or skip of this many bytes will
+ * not block, but may read or skip fewer bytes.
+ * <p>
+ * This method returns the sum of the number of bytes remaining to be read
+ * in the buffer (<code>count - pos</code>) and the result of
+ * calling the {@link java.io.FilterInputStream#in in}.available().
+ *
+ * @return an estimate of the number of bytes that can be read (or skipped
+ * over) from this input stream without blocking.
+ * @exception IOException
+ * if this input stream has been closed by invoking its
+ * {@link #close()} method, or an I/O error occurs.
+ */
+ public int available() throws IOException
+ {
+ return getInIfOpen().available() + (count - pos);
+ }
+
+ /**
+ * See the general contract of the <code>mark</code> method of
+ * <code>InputStream</code>.
+ *
+ * @param readlimit
+ * the maximum limit of bytes that can be read before the mark
+ * position becomes invalid.
+ * @see java.io.BufferedInputStream#reset()
+ */
+ public void mark(int readlimit)
+ {
+ marklimit = readlimit;
+ markpos = pos;
+ }
+
+ /**
+ * See the general contract of the <code>reset</code> method of
+ * <code>InputStream</code>.
+ * <p>
+ * If <code>markpos</code> is <code>-1</code> (no mark has been set or
+ * the mark has been invalidated), an <code>IOException</code> is thrown.
+ * Otherwise, <code>pos</code> is set equal to <code>markpos</code>.
+ *
+ * @exception IOException
+ * if this stream has not been marked or, if the mark has
+ * been invalidated, or the stream has been closed by
+ * invoking its {@link #close()} method, or an I/O error
+ * occurs.
+ * @see java.io.BufferedInputStream#mark(int)
+ */
+ public void reset() throws IOException
+ {
+ getBufIfOpen(); // Cause exception if closed
+ if (markpos < 0)
+ throw new IOException("Resetting to invalid mark");
+ pos = markpos;
+ }
+
+ /**
+ * Tests if this input stream supports the <code>mark</code> and
+ * <code>reset</code> methods. The <code>markSupported</code> method of
+ * <code>BufferedInputStream</code> returns <code>true</code>.
+ *
+ * @return a <code>boolean</code> indicating if this stream type supports
+ * the <code>mark</code> and <code>reset</code> methods.
+ * @see java.io.InputStream#mark(int)
+ * @see java.io.InputStream#reset()
+ */
+ public boolean markSupported()
+ {
+ return true;
+ }
+
+ /**
+ * Closes this input stream and releases any system resources associated
+ * with the stream. Once the stream has been closed, further read(),
+ * available(), reset(), or skip() invocations will throw an IOException.
+ * Closing a previously closed stream has no effect.
+ *
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void close() throws IOException
+ {
+ byte[] buffer;
+ while ((buffer = buf) != null)
+ {
+ if (bufUpdater.compareAndSet(this, buffer, null))
+ {
+ InputStream input = in;
+ in = null;
+ if (input != null)
+ input.close();
+ return;
+ }
+ // Else retry in case a new buf was CASed in fill()
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FastBufferedOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FastBufferedOutputStream.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FastBufferedOutputStream.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FastBufferedOutputStream.java Thu Jul 30 15:30:21 2009
@@ -1,162 +1,162 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io;
-
-import java.io.*;
-
-/**
- * The class implements a buffered output stream. By setting up such an output
- * stream, an application can write bytes to the underlying output stream
- * without necessarily causing a call to the underlying system for each byte
- * written.
- *
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-public class FastBufferedOutputStream extends FilterOutputStream
-{
- /**
- * The internal buffer where data is stored.
- */
- protected byte buf[];
-
- /**
- * The number of valid bytes in the buffer. This value is always in the
- * range <tt>0</tt> through <tt>buf.length</tt>; elements
- * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid byte
- * data.
- */
- protected int count;
-
- /**
- * Creates a new buffered output stream to write data to the specified
- * underlying output stream.
- *
- * @param out
- * the underlying output stream.
- */
- public FastBufferedOutputStream(OutputStream out)
- {
- this(out, 8192);
- }
-
- /**
- * Creates a new buffered output stream to write data to the specified
- * underlying output stream with the specified buffer size.
- *
- * @param out
- * the underlying output stream.
- * @param size
- * the buffer size.
- * @exception IllegalArgumentException
- * if size <= 0.
- */
- public FastBufferedOutputStream(OutputStream out, int size)
- {
- super(out);
- if (size <= 0)
- {
- throw new IllegalArgumentException("Buffer size <= 0");
- }
- buf = new byte[size];
- }
-
- /** Flush the internal buffer */
- private void flushBuffer() throws IOException
- {
- if (count > 0)
- {
- out.write(buf, 0, count);
- count = 0;
- }
- }
-
- /**
- * Writes the specified byte to this buffered output stream.
- *
- * @param b
- * the byte to be written.
- * @exception IOException
- * if an I/O error occurs.
- */
- public void write(int b) throws IOException
- {
- if (count >= buf.length)
- {
- flushBuffer();
- }
- buf[count++] = (byte) b;
- }
-
- /**
- * Writes <code>len</code> bytes from the specified byte array starting at
- * offset <code>off</code> to this buffered output stream.
- *
- * <p>
- * Ordinarily this method stores bytes from the given array into this
- * stream's buffer, flushing the buffer to the underlying output stream as
- * needed. If the requested length is at least as large as this stream's
- * buffer, however, then this method will flush the buffer and write the
- * bytes directly to the underlying output stream. Thus redundant
- * <code>BufferedOutputStream</code>s will not copy data unnecessarily.
- *
- * @param b
- * the data.
- * @param off
- * the start offset in the data.
- * @param len
- * the number of bytes to write.
- * @exception IOException
- * if an I/O error occurs.
- */
- public void write(byte b[], int off, int len)
- throws IOException
- {
- if (len >= buf.length)
- {
- /*
- * If the request length exceeds the size of the output buffer,
- * flush the output buffer and then write the data directly. In this
- * way buffered streams will cascade harmlessly.
- */
- flushBuffer();
- out.write(b, off, len);
- return;
- }
- if (len > buf.length - count)
- {
- flushBuffer();
- }
- System.arraycopy(b, off, buf, count, len);
- count += len;
- }
-
- /**
- * Flushes this buffered output stream. This forces any buffered output
- * bytes to be written out to the underlying output stream.
- *
- * @exception IOException
- * if an I/O error occurs.
- * @see java.io.FilterOutputStream#out
- */
- public void flush() throws IOException
- {
- flushBuffer();
- out.flush();
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+
+/**
+ * The class implements a buffered output stream. By setting up such an output
+ * stream, an application can write bytes to the underlying output stream
+ * without necessarily causing a call to the underlying system for each byte
+ * written.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class FastBufferedOutputStream extends FilterOutputStream
+{
+ /**
+ * The internal buffer where data is stored.
+ */
+ protected byte buf[];
+
+ /**
+ * The number of valid bytes in the buffer. This value is always in the
+ * range <tt>0</tt> through <tt>buf.length</tt>; elements
+ * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid byte
+ * data.
+ */
+ protected int count;
+
+ /**
+ * Creates a new buffered output stream to write data to the specified
+ * underlying output stream.
+ *
+ * @param out
+ * the underlying output stream.
+ */
+ public FastBufferedOutputStream(OutputStream out)
+ {
+ this(out, 8192);
+ }
+
+ /**
+ * Creates a new buffered output stream to write data to the specified
+ * underlying output stream with the specified buffer size.
+ *
+ * @param out
+ * the underlying output stream.
+ * @param size
+ * the buffer size.
+ * @exception IllegalArgumentException
+ * if size <= 0.
+ */
+ public FastBufferedOutputStream(OutputStream out, int size)
+ {
+ super(out);
+ if (size <= 0)
+ {
+ throw new IllegalArgumentException("Buffer size <= 0");
+ }
+ buf = new byte[size];
+ }
+
+ /** Flush the internal buffer */
+ private void flushBuffer() throws IOException
+ {
+ if (count > 0)
+ {
+ out.write(buf, 0, count);
+ count = 0;
+ }
+ }
+
+ /**
+ * Writes the specified byte to this buffered output stream.
+ *
+ * @param b
+ * the byte to be written.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void write(int b) throws IOException
+ {
+ if (count >= buf.length)
+ {
+ flushBuffer();
+ }
+ buf[count++] = (byte) b;
+ }
+
+ /**
+ * Writes <code>len</code> bytes from the specified byte array starting at
+ * offset <code>off</code> to this buffered output stream.
+ *
+ * <p>
+ * Ordinarily this method stores bytes from the given array into this
+ * stream's buffer, flushing the buffer to the underlying output stream as
+ * needed. If the requested length is at least as large as this stream's
+ * buffer, however, then this method will flush the buffer and write the
+ * bytes directly to the underlying output stream. Thus redundant
+ * <code>BufferedOutputStream</code>s will not copy data unnecessarily.
+ *
+ * @param b
+ * the data.
+ * @param off
+ * the start offset in the data.
+ * @param len
+ * the number of bytes to write.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void write(byte b[], int off, int len)
+ throws IOException
+ {
+ if (len >= buf.length)
+ {
+ /*
+ * If the request length exceeds the size of the output buffer,
+ * flush the output buffer and then write the data directly. In this
+ * way buffered streams will cascade harmlessly.
+ */
+ flushBuffer();
+ out.write(b, off, len);
+ return;
+ }
+ if (len > buf.length - count)
+ {
+ flushBuffer();
+ }
+ System.arraycopy(b, off, buf, count, len);
+ count += len;
+ }
+
+ /**
+ * Flushes this buffered output stream. This forces any buffered output
+ * bytes to be written out to the underlying output stream.
+ *
+ * @exception IOException
+ * if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ public void flush() throws IOException
+ {
+ flushBuffer();
+ out.flush();
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java Thu Jul 30 15:30:21 2009
@@ -1,192 +1,192 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.io.IFileReader;
-import org.apache.cassandra.io.SSTableReader;
-
-import org.apache.log4j.Logger;
-import com.google.common.collect.AbstractIterator;
-
-
-public class FileStruct implements Comparable<FileStruct>, Iterator<String>
-{
- private static Logger logger = Logger.getLogger(FileStruct.class);
-
- private String key = null; // decorated!
- private boolean exhausted = false;
- private IFileReader reader;
- private DataInputBuffer bufIn;
- private DataOutputBuffer bufOut;
- private SSTableReader sstable;
- private FileStructIterator iterator;
-
- FileStruct(SSTableReader sstable) throws IOException
- {
- this.reader = SequenceFile.bufferedReader(sstable.getFilename(), 1024 * 1024);
- this.sstable = sstable;
- bufIn = new DataInputBuffer();
- bufOut = new DataOutputBuffer();
- }
-
- public String getFileName()
- {
- return reader.getFileName();
- }
-
- public void close() throws IOException
- {
- reader.close();
- }
-
- public boolean isExhausted()
- {
- return exhausted;
- }
-
- public DataInputBuffer getBufIn()
- {
- return bufIn;
- }
-
- public String getKey()
- {
- return key;
- }
-
- public int compareTo(FileStruct f)
- {
- return sstable.getPartitioner().getDecoratedKeyComparator().compare(key, f.key);
- }
-
- public void seekTo(String seekKey)
- {
- try
- {
- long position = sstable.getNearestPosition(seekKey);
- if (position < 0)
- {
- exhausted = true;
- return;
- }
- reader.seek(position);
- advance();
- }
- catch (IOException e)
- {
- throw new RuntimeException("corrupt sstable", e);
- }
- }
-
- /*
- * Read the next key from the data file, skipping block indexes.
- * Caller must check isExhausted after each call to see if further
- * reads are valid.
- * Do not mix with calls to the iterator interface (next/hasnext).
- * @deprecated -- prefer the iterator interface.
- */
- public void advance() throws IOException
- {
- if (exhausted)
- {
- throw new IndexOutOfBoundsException();
- }
-
- bufOut.reset();
- if (reader.isEOF())
- {
- reader.close();
- exhausted = true;
- return;
- }
-
- long bytesread = reader.next(bufOut);
- if (bytesread == -1)
- {
- reader.close();
- exhausted = true;
- return;
- }
-
- bufIn.reset(bufOut.getData(), bufOut.getLength());
- key = bufIn.readUTF();
- }
-
- public boolean hasNext()
- {
- if (iterator == null)
- iterator = new FileStructIterator();
- return iterator.hasNext();
- }
-
- /** do not mix with manual calls to advance(). */
- public String next()
- {
- if (iterator == null)
- iterator = new FileStructIterator();
- return iterator.next();
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
-
- private class FileStructIterator extends AbstractIterator<String>
- {
- public FileStructIterator()
- {
- if (key == null)
- {
- if (!isExhausted())
- {
- forward();
- }
- }
- }
-
- private void forward()
- {
- try
- {
- advance();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- protected String computeNext()
- {
- if (isExhausted())
- {
- return endOfData();
- }
- String oldKey = key;
- forward();
- return oldKey;
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.SSTableReader;
+
+import org.apache.log4j.Logger;
+import com.google.common.collect.AbstractIterator;
+
+
+public class FileStruct implements Comparable<FileStruct>, Iterator<String>
+{
+ private static Logger logger = Logger.getLogger(FileStruct.class);
+
+ private String key = null; // decorated!
+ private boolean exhausted = false;
+ private IFileReader reader;
+ private DataInputBuffer bufIn;
+ private DataOutputBuffer bufOut;
+ private SSTableReader sstable;
+ private FileStructIterator iterator;
+
+ FileStruct(SSTableReader sstable) throws IOException
+ {
+ this.reader = SequenceFile.bufferedReader(sstable.getFilename(), 1024 * 1024);
+ this.sstable = sstable;
+ bufIn = new DataInputBuffer();
+ bufOut = new DataOutputBuffer();
+ }
+
+ public String getFileName()
+ {
+ return reader.getFileName();
+ }
+
+ public void close() throws IOException
+ {
+ reader.close();
+ }
+
+ public boolean isExhausted()
+ {
+ return exhausted;
+ }
+
+ public DataInputBuffer getBufIn()
+ {
+ return bufIn;
+ }
+
+ public String getKey()
+ {
+ return key;
+ }
+
+ public int compareTo(FileStruct f)
+ {
+ return sstable.getPartitioner().getDecoratedKeyComparator().compare(key, f.key);
+ }
+
+ public void seekTo(String seekKey)
+ {
+ try
+ {
+ long position = sstable.getNearestPosition(seekKey);
+ if (position < 0)
+ {
+ exhausted = true;
+ return;
+ }
+ reader.seek(position);
+ advance();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException("corrupt sstable", e);
+ }
+ }
+
+ /*
+ * Read the next key from the data file, skipping block indexes.
+ * Caller must check isExhausted after each call to see if further
+ * reads are valid.
+ * Do not mix with calls to the iterator interface (next/hasnext).
+ * @deprecated -- prefer the iterator interface.
+ */
+ public void advance() throws IOException
+ {
+ if (exhausted)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+
+ bufOut.reset();
+ if (reader.isEOF())
+ {
+ reader.close();
+ exhausted = true;
+ return;
+ }
+
+ long bytesread = reader.next(bufOut);
+ if (bytesread == -1)
+ {
+ reader.close();
+ exhausted = true;
+ return;
+ }
+
+ bufIn.reset(bufOut.getData(), bufOut.getLength());
+ key = bufIn.readUTF();
+ }
+
+ public boolean hasNext()
+ {
+ if (iterator == null)
+ iterator = new FileStructIterator();
+ return iterator.hasNext();
+ }
+
+ /** do not mix with manual calls to advance(). */
+ public String next()
+ {
+ if (iterator == null)
+ iterator = new FileStructIterator();
+ return iterator.next();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ private class FileStructIterator extends AbstractIterator<String>
+ {
+ public FileStructIterator()
+ {
+ if (key == null)
+ {
+ if (!isExhausted())
+ {
+ forward();
+ }
+ }
+ }
+
+ private void forward()
+ {
+ try
+ {
+ advance();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected String computeNext()
+ {
+ if (isExhausted())
+ {
+ return endOfData();
+ }
+ String oldKey = key;
+ forward();
+ return oldKey;
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java Thu Jul 30 15:30:21 2009
@@ -1,48 +1,48 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io;
-
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.DataInputStream;
-
-/**
- * Allows for the controlled serialization/deserialization of a given type.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public interface ICompactSerializer<T>
-{
- /**
- * Serialize the specified type into the specified DataOutputStream instance.
- * @param t type that needs to be serialized
- * @param dos DataOutput into which serialization needs to happen.
- * @throws IOException
- */
- public void serialize(T t, DataOutputStream dos) throws IOException;
-
- /**
- * Deserialize into the specified DataInputStream instance.
- * @param dis DataInput from which deserialization needs to happen.
- * @throws IOException
- * @return the type that was deserialized
- */
- public T deserialize(DataInputStream dis) throws IOException;
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.DataInputStream;
+
+/**
+ * Allows for the controlled serialization/deserialization of a given type.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface ICompactSerializer<T>
+{
+ /**
+ * Serialize the specified type into the specified DataOutputStream instance.
+ * @param t type that needs to be serialized
+ * @param dos DataOutput into which serialization needs to happen.
+ * @throws IOException
+ */
+ public void serialize(T t, DataOutputStream dos) throws IOException;
+
+ /**
+ * Deserialize into the specified DataInputStream instance.
+ * @param dis DataInput from which deserialization needs to happen.
+ * @throws IOException
+ * @return the type that was deserialized
+ */
+ public T deserialize(DataInputStream dis) throws IOException;
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java Thu Jul 30 15:30:21 2009
@@ -1,83 +1,83 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.SortedSet;
-
-/**
- * Interface to read from the SequenceFile abstraction.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public interface IFileReader
-{
- public String getFileName();
- public long getEOF() throws IOException;
- public long getCurrentPosition() throws IOException;
- public boolean isHealthyFileDescriptor() throws IOException;
- public void seek(long position) throws IOException;
- public boolean isEOF() throws IOException;
-
- /**
- * Be extremely careful while using this API. This currently
- * used to read the commit log header from the commit logs.
- * Treat this as an internal API.
- *
- * @param bytes read into this byte array.
- */
- public void readDirect(byte[] bytes) throws IOException;
-
- /**
- * Read a long value from the underlying sub system.
- * @return value read
- * @throws IOException
- */
- public long readLong() throws IOException;
-
- /**
- * This method dumps the next key/value into the DataOuputStream
- * passed in.
- *
- * @param bufOut DataOutputStream that needs to be filled.
- * @return number of bytes read.
- * @throws IOException
- */
- public long next(DataOutputBuffer bufOut) throws IOException;
-
- /**
- * This method dumps the next key/value into the DataOuputStream
- * passed in. Always use this method to query for application
- * specific data as it will have indexes.
- *
- * @param key - key we are interested in.
- * @param bufOut - DataOutputStream that needs to be filled.
- * @param columnFamilyName The name of the column family only without the ":"
- * @param columnNames - The list of columns in the cfName column family
- * that we want to return
- */
- public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, SortedSet<byte[]> columnNames, long position) throws IOException;
-
- /**
- * Close the file after reading.
- * @throws IOException
- */
- public void close() throws IOException;
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.SortedSet;
+
+/**
+ * Interface to read from the SequenceFile abstraction.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFileReader
+{
+ public String getFileName();
+ public long getEOF() throws IOException;
+ public long getCurrentPosition() throws IOException;
+ public boolean isHealthyFileDescriptor() throws IOException;
+ public void seek(long position) throws IOException;
+ public boolean isEOF() throws IOException;
+
+ /**
+ * Be extremely careful while using this API. This currently
+ * used to read the commit log header from the commit logs.
+ * Treat this as an internal API.
+ *
+ * @param bytes read into this byte array.
+ */
+ public void readDirect(byte[] bytes) throws IOException;
+
+ /**
+ * Read a long value from the underlying sub system.
+ * @return value read
+ * @throws IOException
+ */
+ public long readLong() throws IOException;
+
+ /**
+ * This method dumps the next key/value into the DataOuputStream
+ * passed in.
+ *
+ * @param bufOut DataOutputStream that needs to be filled.
+ * @return number of bytes read.
+ * @throws IOException
+ */
+ public long next(DataOutputBuffer bufOut) throws IOException;
+
+ /**
+ * This method dumps the next key/value into the DataOuputStream
+ * passed in. Always use this method to query for application
+ * specific data as it will have indexes.
+ *
+ * @param key - key we are interested in.
+ * @param bufOut - DataOutputStream that needs to be filled.
+ * @param columnFamilyName The name of the column family only without the ":"
+ * @param columnNames - The list of columns in the cfName column family
+ * that we want to return
+ */
+ public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, SortedSet<byte[]> columnNames, long position) throws IOException;
+
+ /**
+ * Close the file after reading.
+ * @throws IOException
+ */
+ public void close() throws IOException;
+}