You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC

svn commit: r799331 [16/29] - in /incubator/cassandra/trunk: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FastBufferedInputStream.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FastBufferedInputStream.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FastBufferedInputStream.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FastBufferedInputStream.java Thu Jul 30 15:30:21 2009
@@ -1,486 +1,486 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io;
-
-import java.io.*;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-/**
- * A <code>BufferedInputStream</code> adds functionality to another input
- * stream-namely, the ability to buffer the input and to support the
- * <code>mark</code> and <code>reset</code> methods. When the
- * <code>BufferedInputStream</code> is created, an internal buffer array is
- * created. As bytes from the stream are read or skipped, the internal buffer is
- * refilled as necessary from the contained input stream, many bytes at a time.
- * The <code>mark</code> operation remembers a point in the input stream and
- * the <code>reset</code> operation causes all the bytes read since the most
- * recent <code>mark</code> operation to be reread before new bytes are taken
- * from the contained input stream.
- * 
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-public class FastBufferedInputStream extends FilterInputStream
-{
-    
-    private static int defaultBufferSize = 8192;
-    
-    /**
-     * The internal buffer array where the data is stored. When necessary, it
-     * may be replaced by another array of a different size.
-     */
-    protected volatile byte buf[];
-    
-    /**
-     * Atomic updater to provide compareAndSet for buf. This is necessary
-     * because closes can be asynchronous. We use nullness of buf[] as primary
-     * indicator that this stream is closed. (The "in" field is also nulled out
-     * on close.)
-     */
-    private static final AtomicReferenceFieldUpdater<FastBufferedInputStream, byte[]> bufUpdater = AtomicReferenceFieldUpdater
-    .newUpdater(FastBufferedInputStream.class, byte[].class, "buf");
-    
-    /**
-     * The index one greater than the index of the last valid byte in the
-     * buffer. This value is always in the range <code>0</code> through
-     * <code>buf.length</code>; elements <code>buf[0]</code> through
-     * <code>buf[count-1]
-     * </code>contain buffered input data obtained from the
-     * underlying input stream.
-     */
-    protected int count;
-    
-    /**
-     * The current position in the buffer. This is the index of the next
-     * character to be read from the <code>buf</code> array.
-     * <p>
-     * This value is always in the range <code>0</code> through
-     * <code>count</code>. If it is less than <code>count</code>, then
-     * <code>buf[pos]</code> is the next byte to be supplied as input; if it
-     * is equal to <code>count</code>, then the next <code>read</code> or
-     * <code>skip</code> operation will require more bytes to be read from the
-     * contained input stream.
-     * 
-     * @see java.io.BufferedInputStream#buf
-     */
-    protected int pos;
-    
-    /**
-     * The value of the <code>pos</code> field at the time the last
-     * <code>mark</code> method was called.
-     * <p>
-     * This value is always in the range <code>-1</code> through
-     * <code>pos</code>. If there is no marked position in the input stream,
-     * this field is <code>-1</code>. If there is a marked position in the
-     * input stream, then <code>buf[markpos]</code> is the first byte to be
-     * supplied as input after a <code>reset</code> operation. If
-     * <code>markpos</code> is not <code>-1</code>, then all bytes from
-     * positions <code>buf[markpos]</code> through <code>buf[pos-1]</code>
-     * must remain in the buffer array (though they may be moved to another
-     * place in the buffer array, with suitable adjustments to the values of
-     * <code>count</code>, <code>pos</code>, and <code>markpos</code>);
-     * they may not be discarded unless and until the difference between
-     * <code>pos</code> and <code>markpos</code> exceeds
-     * <code>marklimit</code>.
-     * 
-     * @see java.io.BufferedInputStream#mark(int)
-     * @see java.io.BufferedInputStream#pos
-     */
-    protected int markpos = -1;
-    
-    /**
-     * The maximum read ahead allowed after a call to the <code>mark</code>
-     * method before subsequent calls to the <code>reset</code> method fail.
-     * Whenever the difference between <code>pos</code> and
-     * <code>markpos</code> exceeds <code>marklimit</code>, then the mark
-     * may be dropped by setting <code>markpos</code> to <code>-1</code>.
-     * 
-     * @see java.io.BufferedInputStream#mark(int)
-     * @see java.io.BufferedInputStream#reset()
-     */
-    protected int marklimit;
-    
-    /**
-     * Check to make sure that underlying input stream has not been nulled out
-     * due to close; if not return it;
-     */
-    private InputStream getInIfOpen() throws IOException
-    {
-        InputStream input = in;
-        if (input == null)
-            throw new IOException("Stream closed");
-        return input;
-    }
-    
-    /**
-     * Check to make sure that buffer has not been nulled out due to close; if
-     * not return it;
-     */
-    private byte[] getBufIfOpen() throws IOException
-    {
-        byte[] buffer = buf;
-        if (buffer == null)
-            throw new IOException("Stream closed");
-        return buffer;
-    }
-    
-    /**
-     * Creates a <code>BufferedInputStream</code> and saves its argument, the
-     * input stream <code>in</code>, for later use. An internal buffer array
-     * is created and stored in <code>buf</code>.
-     * 
-     * @param in
-     *            the underlying input stream.
-     */
-    public FastBufferedInputStream(InputStream in)
-    {
-        this(in, defaultBufferSize);
-    }
-    
-    /**
-     * Creates a <code>BufferedInputStream</code> with the specified buffer
-     * size, and saves its argument, the input stream <code>in</code>, for
-     * later use. An internal buffer array of length <code>size</code> is
-     * created and stored in <code>buf</code>.
-     * 
-     * @param in
-     *            the underlying input stream.
-     * @param size
-     *            the buffer size.
-     * @exception IllegalArgumentException
-     *                if size <= 0.
-     */
-    public FastBufferedInputStream(InputStream in, int size)
-    {
-        super(in);
-        if (size <= 0)
-        {
-            throw new IllegalArgumentException("Buffer size <= 0");
-        }
-        buf = new byte[size];
-    }
-    
-    /**
-     * Fills the buffer with more data, taking into account shuffling and other
-     * tricks for dealing with marks. Assumes that it is being called by a
-     * synchronized method. This method also assumes that all data has already
-     * been read in, hence pos > count.
-     */
-    private void fill() throws IOException
-    {
-        byte[] buffer = getBufIfOpen();
-        if (markpos < 0)
-            pos = 0; /* no mark: throw away the buffer */
-        else if (pos >= buffer.length) /* no room left in buffer */
-            if (markpos > 0)
-            { /* can throw away early part of the buffer */
-                int sz = pos - markpos;
-                System.arraycopy(buffer, markpos, buffer, 0, sz);
-                pos = sz;
-                markpos = 0;
-            }
-            else if (buffer.length >= marklimit)
-            {
-                markpos = -1; /* buffer got too big, invalidate mark */
-                pos = 0; /* drop buffer contents */
-            }
-            else
-            { /* grow buffer */
-                int nsz = pos * 2;
-                if (nsz > marklimit)
-                    nsz = marklimit;
-                byte nbuf[] = new byte[nsz];
-                System.arraycopy(buffer, 0, nbuf, 0, pos);
-                if (!bufUpdater.compareAndSet(this, buffer, nbuf))
-                {
-                    // Can't replace buf if there was an async close.
-                    // Note: This would need to be changed if fill()
-                    // is ever made accessible to multiple threads.
-                    // But for now, the only way CAS can fail is via close.
-                    // assert buf == null;
-                    throw new IOException("Stream closed");
-                }
-                buffer = nbuf;
-            }
-        count = pos;
-        int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
-        if (n > 0)
-            count = n + pos;
-    }
-    
-    /**
-     * See the general contract of the <code>read</code> method of
-     * <code>InputStream</code>.
-     * 
-     * @return the next byte of data, or <code>-1</code> if the end of the
-     *         stream is reached.
-     * @exception IOException
-     *                if this input stream has been closed by invoking its
-     *                {@link #close()} method, or an I/O error occurs.
-     * @see java.io.FilterInputStream#in
-     */
-    public  int read() throws IOException
-    {
-        if (pos >= count)
-        {
-            fill();
-            if (pos >= count)
-                return -1;
-        }
-        return getBufIfOpen()[pos++] & 0xff;
-    }
-    
-    /**
-     * Read characters into a portion of an array, reading from the underlying
-     * stream at most once if necessary.
-     */
-    private int read1(byte[] b, int off, int len) throws IOException
-    {
-        int avail = count - pos;
-        if (avail <= 0)
-        {
-            /*
-             * If the requested length is at least as large as the buffer, and
-             * if there is no mark/reset activity, do not bother to copy the
-             * bytes into the local buffer. In this way buffered streams will
-             * cascade harmlessly.
-             */
-            if (len >= getBufIfOpen().length && markpos < 0)
-            {
-                return getInIfOpen().read(b, off, len);
-            }
-            fill();
-            avail = count - pos;
-            if (avail <= 0)
-                return -1;
-        }
-        int cnt = (avail < len) ? avail : len;
-        System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
-        pos += cnt;
-        return cnt;
-    }
-    
-    /**
-     * Reads bytes from this byte-input stream into the specified byte array,
-     * starting at the given offset.
-     * 
-     * <p>
-     * This method implements the general contract of the corresponding
-     * <code>{@link InputStream#read(byte[], int, int) read}</code> method of
-     * the <code>{@link InputStream}</code> class. As an additional
-     * convenience, it attempts to read as many bytes as possible by repeatedly
-     * invoking the <code>read</code> method of the underlying stream. This
-     * iterated <code>read</code> continues until one of the following
-     * conditions becomes true:
-     * <ul>
-     * 
-     * <li> The specified number of bytes have been read,
-     * 
-     * <li> The <code>read</code> method of the underlying stream returns
-     * <code>-1</code>, indicating end-of-file, or
-     * 
-     * <li> The <code>available</code> method of the underlying stream returns
-     * zero, indicating that further input requests would block.
-     * 
-     * </ul>
-     * If the first <code>read</code> on the underlying stream returns
-     * <code>-1</code> to indicate end-of-file then this method returns
-     * <code>-1</code>. Otherwise this method returns the number of bytes
-     * actually read.
-     * 
-     * <p>
-     * Subclasses of this class are encouraged, but not required, to attempt to
-     * read as many bytes as possible in the same fashion.
-     * 
-     * @param b
-     *            destination buffer.
-     * @param off
-     *            offset at which to start storing bytes.
-     * @param len
-     *            maximum number of bytes to read.
-     * @return the number of bytes read, or <code>-1</code> if the end of the
-     *         stream has been reached.
-     * @exception IOException
-     *                if this input stream has been closed by invoking its
-     *                {@link #close()} method, or an I/O error occurs.
-     */
-    public  int read(byte b[], int off, int len) throws IOException
-    {
-        getBufIfOpen(); // Check for closed stream
-        if ((off | len | (off + len) | (b.length - (off + len))) < 0)
-        {
-            throw new IndexOutOfBoundsException();
-        }
-        else if (len == 0)
-        {
-            return 0;
-        }
-        
-        int n = 0;
-        for (;;)
-        {
-            int nread = read1(b, off + n, len - n);
-            if (nread <= 0)
-                return (n == 0) ? nread : n;
-            n += nread;
-            if (n >= len)
-                return n;
-            // if not closed but no bytes available, return
-            InputStream input = in;
-            if (input != null && input.available() <= 0)
-                return n;
-        }
-    }
-    
-    /**
-     * See the general contract of the <code>skip</code> method of
-     * <code>InputStream</code>.
-     * 
-     * @exception IOException
-     *                if the stream does not support seek, or if this input
-     *                stream has been closed by invoking its {@link #close()}
-     *                method, or an I/O error occurs.
-     */
-    public  long skip(long n) throws IOException
-    {
-        getBufIfOpen(); // Check for closed stream
-        if (n <= 0)
-        {
-            return 0;
-        }
-        long avail = count - pos;
-        
-        if (avail <= 0)
-        {
-            // If no mark position set then don't keep in buffer
-            if (markpos < 0)
-                return getInIfOpen().skip(n);
-            
-            // Fill in buffer to save bytes for reset
-            fill();
-            avail = count - pos;
-            if (avail <= 0)
-                return 0;
-        }
-        
-        long skipped = (avail < n) ? avail : n;
-        pos += skipped;
-        return skipped;
-    }
-    
-    /**
-     * Returns an estimate of the number of bytes that can be read (or skipped
-     * over) from this input stream without blocking by the next invocation of a
-     * method for this input stream. The next invocation might be the same
-     * thread or another thread. A single read or skip of this many bytes will
-     * not block, but may read or skip fewer bytes.
-     * <p>
-     * This method returns the sum of the number of bytes remaining to be read
-     * in the buffer (<code>count&nbsp;- pos</code>) and the result of
-     * calling the {@link java.io.FilterInputStream#in in}.available().
-     * 
-     * @return an estimate of the number of bytes that can be read (or skipped
-     *         over) from this input stream without blocking.
-     * @exception IOException
-     *                if this input stream has been closed by invoking its
-     *                {@link #close()} method, or an I/O error occurs.
-     */
-    public  int available() throws IOException
-    {
-        return getInIfOpen().available() + (count - pos);
-    }
-    
-    /**
-     * See the general contract of the <code>mark</code> method of
-     * <code>InputStream</code>.
-     * 
-     * @param readlimit
-     *            the maximum limit of bytes that can be read before the mark
-     *            position becomes invalid.
-     * @see java.io.BufferedInputStream#reset()
-     */
-    public  void mark(int readlimit)
-    {
-        marklimit = readlimit;
-        markpos = pos;
-    }
-    
-    /**
-     * See the general contract of the <code>reset</code> method of
-     * <code>InputStream</code>.
-     * <p>
-     * If <code>markpos</code> is <code>-1</code> (no mark has been set or
-     * the mark has been invalidated), an <code>IOException</code> is thrown.
-     * Otherwise, <code>pos</code> is set equal to <code>markpos</code>.
-     * 
-     * @exception IOException
-     *                if this stream has not been marked or, if the mark has
-     *                been invalidated, or the stream has been closed by
-     *                invoking its {@link #close()} method, or an I/O error
-     *                occurs.
-     * @see java.io.BufferedInputStream#mark(int)
-     */
-    public  void reset() throws IOException
-    {
-        getBufIfOpen(); // Cause exception if closed
-        if (markpos < 0)
-            throw new IOException("Resetting to invalid mark");
-        pos = markpos;
-    }
-    
-    /**
-     * Tests if this input stream supports the <code>mark</code> and
-     * <code>reset</code> methods. The <code>markSupported</code> method of
-     * <code>BufferedInputStream</code> returns <code>true</code>.
-     * 
-     * @return a <code>boolean</code> indicating if this stream type supports
-     *         the <code>mark</code> and <code>reset</code> methods.
-     * @see java.io.InputStream#mark(int)
-     * @see java.io.InputStream#reset()
-     */
-    public boolean markSupported()
-    {
-        return true;
-    }
-    
-    /**
-     * Closes this input stream and releases any system resources associated
-     * with the stream. Once the stream has been closed, further read(),
-     * available(), reset(), or skip() invocations will throw an IOException.
-     * Closing a previously closed stream has no effect.
-     * 
-     * @exception IOException
-     *                if an I/O error occurs.
-     */
-    public void close() throws IOException
-    {
-        byte[] buffer;
-        while ((buffer = buf) != null)
-        {
-            if (bufUpdater.compareAndSet(this, buffer, null))
-            {
-                InputStream input = in;
-                in = null;
-                if (input != null)
-                    input.close();
-                return;
-            }
-            // Else retry in case a new buf was CASed in fill()
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * A <code>BufferedInputStream</code> adds functionality to another input
+ * stream-namely, the ability to buffer the input and to support the
+ * <code>mark</code> and <code>reset</code> methods. When the
+ * <code>BufferedInputStream</code> is created, an internal buffer array is
+ * created. As bytes from the stream are read or skipped, the internal buffer is
+ * refilled as necessary from the contained input stream, many bytes at a time.
+ * The <code>mark</code> operation remembers a point in the input stream and
+ * the <code>reset</code> operation causes all the bytes read since the most
+ * recent <code>mark</code> operation to be reread before new bytes are taken
+ * from the contained input stream.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class FastBufferedInputStream extends FilterInputStream
+{
+    
+    private static int defaultBufferSize = 8192;
+    
+    /**
+     * The internal buffer array where the data is stored. When necessary, it
+     * may be replaced by another array of a different size.
+     */
+    protected volatile byte buf[];
+    
+    /**
+     * Atomic updater to provide compareAndSet for buf. This is necessary
+     * because closes can be asynchronous. We use nullness of buf[] as primary
+     * indicator that this stream is closed. (The "in" field is also nulled out
+     * on close.)
+     */
+    private static final AtomicReferenceFieldUpdater<FastBufferedInputStream, byte[]> bufUpdater = AtomicReferenceFieldUpdater
+    .newUpdater(FastBufferedInputStream.class, byte[].class, "buf");
+    
+    /**
+     * The index one greater than the index of the last valid byte in the
+     * buffer. This value is always in the range <code>0</code> through
+     * <code>buf.length</code>; elements <code>buf[0]</code> through
+     * <code>buf[count-1]
+     * </code>contain buffered input data obtained from the
+     * underlying input stream.
+     */
+    protected int count;
+    
+    /**
+     * The current position in the buffer. This is the index of the next
+     * character to be read from the <code>buf</code> array.
+     * <p>
+     * This value is always in the range <code>0</code> through
+     * <code>count</code>. If it is less than <code>count</code>, then
+     * <code>buf[pos]</code> is the next byte to be supplied as input; if it
+     * is equal to <code>count</code>, then the next <code>read</code> or
+     * <code>skip</code> operation will require more bytes to be read from the
+     * contained input stream.
+     * 
+     * @see java.io.BufferedInputStream#buf
+     */
+    protected int pos;
+    
+    /**
+     * The value of the <code>pos</code> field at the time the last
+     * <code>mark</code> method was called.
+     * <p>
+     * This value is always in the range <code>-1</code> through
+     * <code>pos</code>. If there is no marked position in the input stream,
+     * this field is <code>-1</code>. If there is a marked position in the
+     * input stream, then <code>buf[markpos]</code> is the first byte to be
+     * supplied as input after a <code>reset</code> operation. If
+     * <code>markpos</code> is not <code>-1</code>, then all bytes from
+     * positions <code>buf[markpos]</code> through <code>buf[pos-1]</code>
+     * must remain in the buffer array (though they may be moved to another
+     * place in the buffer array, with suitable adjustments to the values of
+     * <code>count</code>, <code>pos</code>, and <code>markpos</code>);
+     * they may not be discarded unless and until the difference between
+     * <code>pos</code> and <code>markpos</code> exceeds
+     * <code>marklimit</code>.
+     * 
+     * @see java.io.BufferedInputStream#mark(int)
+     * @see java.io.BufferedInputStream#pos
+     */
+    protected int markpos = -1;
+    
+    /**
+     * The maximum read ahead allowed after a call to the <code>mark</code>
+     * method before subsequent calls to the <code>reset</code> method fail.
+     * Whenever the difference between <code>pos</code> and
+     * <code>markpos</code> exceeds <code>marklimit</code>, then the mark
+     * may be dropped by setting <code>markpos</code> to <code>-1</code>.
+     * 
+     * @see java.io.BufferedInputStream#mark(int)
+     * @see java.io.BufferedInputStream#reset()
+     */
+    protected int marklimit;
+    
+    /**
+     * Check to make sure that underlying input stream has not been nulled out
+     * due to close; if not return it;
+     */
+    private InputStream getInIfOpen() throws IOException
+    {
+        InputStream input = in;
+        if (input == null)
+            throw new IOException("Stream closed");
+        return input;
+    }
+    
+    /**
+     * Check to make sure that buffer has not been nulled out due to close; if
+     * not return it;
+     */
+    private byte[] getBufIfOpen() throws IOException
+    {
+        byte[] buffer = buf;
+        if (buffer == null)
+            throw new IOException("Stream closed");
+        return buffer;
+    }
+    
+    /**
+     * Creates a <code>BufferedInputStream</code> and saves its argument, the
+     * input stream <code>in</code>, for later use. An internal buffer array
+     * is created and stored in <code>buf</code>.
+     * 
+     * @param in
+     *            the underlying input stream.
+     */
+    public FastBufferedInputStream(InputStream in)
+    {
+        this(in, defaultBufferSize);
+    }
+    
+    /**
+     * Creates a <code>BufferedInputStream</code> with the specified buffer
+     * size, and saves its argument, the input stream <code>in</code>, for
+     * later use. An internal buffer array of length <code>size</code> is
+     * created and stored in <code>buf</code>.
+     * 
+     * @param in
+     *            the underlying input stream.
+     * @param size
+     *            the buffer size.
+     * @exception IllegalArgumentException
+     *                if size <= 0.
+     */
+    public FastBufferedInputStream(InputStream in, int size)
+    {
+        super(in);
+        if (size <= 0)
+        {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        buf = new byte[size];
+    }
+    
+    /**
+     * Fills the buffer with more data, taking into account shuffling and other
+     * tricks for dealing with marks. Assumes that it is being called by a
+     * synchronized method. This method also assumes that all data has already
+     * been read in, hence pos > count.
+     */
+    private void fill() throws IOException
+    {
+        byte[] buffer = getBufIfOpen();
+        if (markpos < 0)
+            pos = 0; /* no mark: throw away the buffer */
+        else if (pos >= buffer.length) /* no room left in buffer */
+            if (markpos > 0)
+            { /* can throw away early part of the buffer */
+                int sz = pos - markpos;
+                System.arraycopy(buffer, markpos, buffer, 0, sz);
+                pos = sz;
+                markpos = 0;
+            }
+            else if (buffer.length >= marklimit)
+            {
+                markpos = -1; /* buffer got too big, invalidate mark */
+                pos = 0; /* drop buffer contents */
+            }
+            else
+            { /* grow buffer */
+                int nsz = pos * 2;
+                if (nsz > marklimit)
+                    nsz = marklimit;
+                byte nbuf[] = new byte[nsz];
+                System.arraycopy(buffer, 0, nbuf, 0, pos);
+                if (!bufUpdater.compareAndSet(this, buffer, nbuf))
+                {
+                    // Can't replace buf if there was an async close.
+                    // Note: This would need to be changed if fill()
+                    // is ever made accessible to multiple threads.
+                    // But for now, the only way CAS can fail is via close.
+                    // assert buf == null;
+                    throw new IOException("Stream closed");
+                }
+                buffer = nbuf;
+            }
+        count = pos;
+        int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
+        if (n > 0)
+            count = n + pos;
+    }
+    
+    /**
+     * See the general contract of the <code>read</code> method of
+     * <code>InputStream</code>.
+     * 
+     * @return the next byte of data, or <code>-1</code> if the end of the
+     *         stream is reached.
+     * @exception IOException
+     *                if this input stream has been closed by invoking its
+     *                {@link #close()} method, or an I/O error occurs.
+     * @see java.io.FilterInputStream#in
+     */
+    public  int read() throws IOException
+    {
+        if (pos >= count)
+        {
+            fill();
+            if (pos >= count)
+                return -1;
+        }
+        return getBufIfOpen()[pos++] & 0xff;
+    }
+    
+    /**
+     * Read characters into a portion of an array, reading from the underlying
+     * stream at most once if necessary.
+     */
+    private int read1(byte[] b, int off, int len) throws IOException
+    {
+        int avail = count - pos;
+        if (avail <= 0)
+        {
+            /*
+             * If the requested length is at least as large as the buffer, and
+             * if there is no mark/reset activity, do not bother to copy the
+             * bytes into the local buffer. In this way buffered streams will
+             * cascade harmlessly.
+             */
+            if (len >= getBufIfOpen().length && markpos < 0)
+            {
+                return getInIfOpen().read(b, off, len);
+            }
+            fill();
+            avail = count - pos;
+            if (avail <= 0)
+                return -1;
+        }
+        int cnt = (avail < len) ? avail : len;
+        System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
+        pos += cnt;
+        return cnt;
+    }
+    
+    /**
+     * Reads bytes from this byte-input stream into the specified byte array,
+     * starting at the given offset.
+     * 
+     * <p>
+     * This method implements the general contract of the corresponding
+     * <code>{@link InputStream#read(byte[], int, int) read}</code> method of
+     * the <code>{@link InputStream}</code> class. As an additional
+     * convenience, it attempts to read as many bytes as possible by repeatedly
+     * invoking the <code>read</code> method of the underlying stream. This
+     * iterated <code>read</code> continues until one of the following
+     * conditions becomes true:
+     * <ul>
+     * 
+     * <li> The specified number of bytes have been read,
+     * 
+     * <li> The <code>read</code> method of the underlying stream returns
+     * <code>-1</code>, indicating end-of-file, or
+     * 
+     * <li> The <code>available</code> method of the underlying stream returns
+     * zero, indicating that further input requests would block.
+     * 
+     * </ul>
+     * If the first <code>read</code> on the underlying stream returns
+     * <code>-1</code> to indicate end-of-file then this method returns
+     * <code>-1</code>. Otherwise this method returns the number of bytes
+     * actually read.
+     * 
+     * <p>
+     * Subclasses of this class are encouraged, but not required, to attempt to
+     * read as many bytes as possible in the same fashion.
+     * 
+     * @param b
+     *            destination buffer.
+     * @param off
+     *            offset at which to start storing bytes.
+     * @param len
+     *            maximum number of bytes to read.
+     * @return the number of bytes read, or <code>-1</code> if the end of the
+     *         stream has been reached.
+     * @exception IOException
+     *                if this input stream has been closed by invoking its
+     *                {@link #close()} method, or an I/O error occurs.
+     */
+    public  int read(byte b[], int off, int len) throws IOException
+    {
+        getBufIfOpen(); // Check for closed stream
+        if ((off | len | (off + len) | (b.length - (off + len))) < 0)
+        {
+            throw new IndexOutOfBoundsException();
+        }
+        else if (len == 0)
+        {
+            return 0;
+        }
+        
+        int n = 0;
+        for (;;)
+        {
+            int nread = read1(b, off + n, len - n);
+            if (nread <= 0)
+                return (n == 0) ? nread : n;
+            n += nread;
+            if (n >= len)
+                return n;
+            // if not closed but no bytes available, return
+            InputStream input = in;
+            if (input != null && input.available() <= 0)
+                return n;
+        }
+    }
+    
+    /**
+     * See the general contract of the <code>skip</code> method of
+     * <code>InputStream</code>.
+     * 
+     * @exception IOException
+     *                if the stream does not support seek, or if this input
+     *                stream has been closed by invoking its {@link #close()}
+     *                method, or an I/O error occurs.
+     */
+    public  long skip(long n) throws IOException
+    {
+        getBufIfOpen(); // Check for closed stream
+        if (n <= 0)
+        {
+            return 0;
+        }
+        long avail = count - pos;
+        
+        if (avail <= 0)
+        {
+            // If no mark position set then don't keep in buffer
+            if (markpos < 0)
+                return getInIfOpen().skip(n);
+            
+            // Fill in buffer to save bytes for reset
+            fill();
+            avail = count - pos;
+            if (avail <= 0)
+                return 0;
+        }
+        
+        long skipped = (avail < n) ? avail : n;
+        pos += skipped;
+        return skipped;
+    }
+    
+    /**
+     * Returns an estimate of the number of bytes that can be read (or skipped
+     * over) from this input stream without blocking by the next invocation of a
+     * method for this input stream. The next invocation might be the same
+     * thread or another thread. A single read or skip of this many bytes will
+     * not block, but may read or skip fewer bytes.
+     * <p>
+     * This method returns the sum of the number of bytes remaining to be read
+     * in the buffer (<code>count&nbsp;- pos</code>) and the result of
+     * calling the {@link java.io.FilterInputStream#in in}.available().
+     * 
+     * @return an estimate of the number of bytes that can be read (or skipped
+     *         over) from this input stream without blocking.
+     * @exception IOException
+     *                if this input stream has been closed by invoking its
+     *                {@link #close()} method, or an I/O error occurs.
+     */
+    public  int available() throws IOException
+    {
+        return getInIfOpen().available() + (count - pos);
+    }
+    
+    /**
+     * See the general contract of the <code>mark</code> method of
+     * <code>InputStream</code>.
+     * 
+     * @param readlimit
+     *            the maximum limit of bytes that can be read before the mark
+     *            position becomes invalid.
+     * @see java.io.BufferedInputStream#reset()
+     */
+    public  void mark(int readlimit)
+    {
+        marklimit = readlimit;
+        markpos = pos;
+    }
+    
+    /**
+     * See the general contract of the <code>reset</code> method of
+     * <code>InputStream</code>.
+     * <p>
+     * If <code>markpos</code> is <code>-1</code> (no mark has been set or
+     * the mark has been invalidated), an <code>IOException</code> is thrown.
+     * Otherwise, <code>pos</code> is set equal to <code>markpos</code>.
+     * 
+     * @exception IOException
+     *                if this stream has not been marked or, if the mark has
+     *                been invalidated, or the stream has been closed by
+     *                invoking its {@link #close()} method, or an I/O error
+     *                occurs.
+     * @see java.io.BufferedInputStream#mark(int)
+     */
+    public  void reset() throws IOException
+    {
+        getBufIfOpen(); // Cause exception if closed
+        if (markpos < 0)
+            throw new IOException("Resetting to invalid mark");
+        pos = markpos;
+    }
+    
+    /**
+     * Tests if this input stream supports the <code>mark</code> and
+     * <code>reset</code> methods. The <code>markSupported</code> method of
+     * <code>BufferedInputStream</code> returns <code>true</code>.
+     * 
+     * @return a <code>boolean</code> indicating if this stream type supports
+     *         the <code>mark</code> and <code>reset</code> methods.
+     * @see java.io.InputStream#mark(int)
+     * @see java.io.InputStream#reset()
+     */
+    public boolean markSupported()
+    {
+        return true;
+    }
+    
+    /**
+     * Closes this input stream and releases any system resources associated
+     * with the stream. Once the stream has been closed, further read(),
+     * available(), reset(), or skip() invocations will throw an IOException.
+     * Closing a previously closed stream has no effect.
+     * 
+     * @exception IOException
+     *                if an I/O error occurs.
+     */
+    public void close() throws IOException
+    {
+        byte[] buffer;
+        while ((buffer = buf) != null)
+        {
+            if (bufUpdater.compareAndSet(this, buffer, null))
+            {
+                InputStream input = in;
+                in = null;
+                if (input != null)
+                    input.close();
+                return;
+            }
+            // Else retry in case a new buf was CASed in fill()
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FastBufferedOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FastBufferedOutputStream.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FastBufferedOutputStream.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FastBufferedOutputStream.java Thu Jul 30 15:30:21 2009
@@ -1,162 +1,162 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io;
-
-import java.io.*;
-
-/**
- * The class implements a buffered output stream. By setting up such an output
- * stream, an application can write bytes to the underlying output stream
- * without necessarily causing a call to the underlying system for each byte
- * written.
- * 
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-public class FastBufferedOutputStream extends FilterOutputStream
-{
-    /**
-     * The internal buffer where data is stored.
-     */
-    protected byte buf[];
-    
-    /**
-     * The number of valid bytes in the buffer. This value is always in the
-     * range <tt>0</tt> through <tt>buf.length</tt>; elements
-     * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid byte
-     * data.
-     */
-    protected int count;
-    
-    /**
-     * Creates a new buffered output stream to write data to the specified
-     * underlying output stream.
-     * 
-     * @param out
-     *            the underlying output stream.
-     */
-    public FastBufferedOutputStream(OutputStream out)
-    {
-        this(out, 8192);
-    }
-    
-    /**
-     * Creates a new buffered output stream to write data to the specified
-     * underlying output stream with the specified buffer size.
-     * 
-     * @param out
-     *            the underlying output stream.
-     * @param size
-     *            the buffer size.
-     * @exception IllegalArgumentException
-     *                if size &lt;= 0.
-     */
-    public FastBufferedOutputStream(OutputStream out, int size)
-    {
-        super(out);
-        if (size <= 0)
-        {
-            throw new IllegalArgumentException("Buffer size <= 0");
-        }
-        buf = new byte[size];
-    }
-    
-    /** Flush the internal buffer */
-    private void flushBuffer() throws IOException
-    {
-        if (count > 0)
-        {
-            out.write(buf, 0, count);
-            count = 0;
-        }
-    }
-    
-    /**
-     * Writes the specified byte to this buffered output stream.
-     * 
-     * @param b
-     *            the byte to be written.
-     * @exception IOException
-     *                if an I/O error occurs.
-     */
-    public void write(int b) throws IOException
-    {
-        if (count >= buf.length)
-        {
-            flushBuffer();
-        }
-        buf[count++] = (byte) b;
-    }
-    
-    /**
-     * Writes <code>len</code> bytes from the specified byte array starting at
-     * offset <code>off</code> to this buffered output stream.
-     * 
-     * <p>
-     * Ordinarily this method stores bytes from the given array into this
-     * stream's buffer, flushing the buffer to the underlying output stream as
-     * needed. If the requested length is at least as large as this stream's
-     * buffer, however, then this method will flush the buffer and write the
-     * bytes directly to the underlying output stream. Thus redundant
-     * <code>BufferedOutputStream</code>s will not copy data unnecessarily.
-     * 
-     * @param b
-     *            the data.
-     * @param off
-     *            the start offset in the data.
-     * @param len
-     *            the number of bytes to write.
-     * @exception IOException
-     *                if an I/O error occurs.
-     */
-    public void write(byte b[], int off, int len)
-    throws IOException
-    {
-        if (len >= buf.length)
-        {
-            /*
-             * If the request length exceeds the size of the output buffer,
-             * flush the output buffer and then write the data directly. In this
-             * way buffered streams will cascade harmlessly.
-             */
-            flushBuffer();
-            out.write(b, off, len);
-            return;
-        }
-        if (len > buf.length - count)
-        {
-            flushBuffer();
-        }
-        System.arraycopy(b, off, buf, count, len);
-        count += len;
-    }
-    
-    /**
-     * Flushes this buffered output stream. This forces any buffered output
-     * bytes to be written out to the underlying output stream.
-     * 
-     * @exception IOException
-     *                if an I/O error occurs.
-     * @see java.io.FilterOutputStream#out
-     */
-    public void flush() throws IOException
-    {
-        flushBuffer();
-        out.flush();
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+
+/**
+ * The class implements a buffered output stream. By setting up such an output
+ * stream, an application can write bytes to the underlying output stream
+ * without necessarily causing a call to the underlying system for each byte
+ * written.
+ * 
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class FastBufferedOutputStream extends FilterOutputStream
+{
+    /**
+     * The internal buffer where data is stored.
+     */
+    protected byte buf[];
+    
+    /**
+     * The number of valid bytes in the buffer. This value is always in the
+     * range <tt>0</tt> through <tt>buf.length</tt>; elements
+     * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid byte
+     * data.
+     */
+    protected int count;
+    
+    /**
+     * Creates a new buffered output stream to write data to the specified
+     * underlying output stream.
+     * 
+     * @param out
+     *            the underlying output stream.
+     */
+    public FastBufferedOutputStream(OutputStream out)
+    {
+        this(out, 8192);
+    }
+    
+    /**
+     * Creates a new buffered output stream to write data to the specified
+     * underlying output stream with the specified buffer size.
+     * 
+     * @param out
+     *            the underlying output stream.
+     * @param size
+     *            the buffer size.
+     * @exception IllegalArgumentException
+     *                if size &lt;= 0.
+     */
+    public FastBufferedOutputStream(OutputStream out, int size)
+    {
+        super(out);
+        if (size <= 0)
+        {
+            throw new IllegalArgumentException("Buffer size <= 0");
+        }
+        buf = new byte[size];
+    }
+    
+    /** Flush the internal buffer */
+    private void flushBuffer() throws IOException
+    {
+        if (count > 0)
+        {
+            out.write(buf, 0, count);
+            count = 0;
+        }
+    }
+    
+    /**
+     * Writes the specified byte to this buffered output stream.
+     * 
+     * @param b
+     *            the byte to be written.
+     * @exception IOException
+     *                if an I/O error occurs.
+     */
+    public void write(int b) throws IOException
+    {
+        if (count >= buf.length)
+        {
+            flushBuffer();
+        }
+        buf[count++] = (byte) b;
+    }
+    
+    /**
+     * Writes <code>len</code> bytes from the specified byte array starting at
+     * offset <code>off</code> to this buffered output stream.
+     * 
+     * <p>
+     * Ordinarily this method stores bytes from the given array into this
+     * stream's buffer, flushing the buffer to the underlying output stream as
+     * needed. If the requested length is at least as large as this stream's
+     * buffer, however, then this method will flush the buffer and write the
+     * bytes directly to the underlying output stream. Thus redundant
+     * <code>BufferedOutputStream</code>s will not copy data unnecessarily.
+     * 
+     * @param b
+     *            the data.
+     * @param off
+     *            the start offset in the data.
+     * @param len
+     *            the number of bytes to write.
+     * @exception IOException
+     *                if an I/O error occurs.
+     */
+    public void write(byte b[], int off, int len)
+    throws IOException
+    {
+        if (len >= buf.length)
+        {
+            /*
+             * If the request length exceeds the size of the output buffer,
+             * flush the output buffer and then write the data directly. In this
+             * way buffered streams will cascade harmlessly.
+             */
+            flushBuffer();
+            out.write(b, off, len);
+            return;
+        }
+        if (len > buf.length - count)
+        {
+            flushBuffer();
+        }
+        System.arraycopy(b, off, buf, count, len);
+        count += len;
+    }
+    
+    /**
+     * Flushes this buffered output stream. This forces any buffered output
+     * bytes to be written out to the underlying output stream.
+     * 
+     * @exception IOException
+     *                if an I/O error occurs.
+     * @see java.io.FilterOutputStream#out
+     */
+    public void flush() throws IOException
+    {
+        flushBuffer();
+        out.flush();
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/FileStruct.java Thu Jul 30 15:30:21 2009
@@ -1,192 +1,192 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.io.IFileReader;
-import org.apache.cassandra.io.SSTableReader;
-
-import org.apache.log4j.Logger;
-import com.google.common.collect.AbstractIterator;
-
-
-public class FileStruct implements Comparable<FileStruct>, Iterator<String>
-{
-    private static Logger logger = Logger.getLogger(FileStruct.class);
-
-    private String key = null; // decorated!
-    private boolean exhausted = false;
-    private IFileReader reader;
-    private DataInputBuffer bufIn;
-    private DataOutputBuffer bufOut;
-    private SSTableReader sstable;
-    private FileStructIterator iterator;
-
-    FileStruct(SSTableReader sstable) throws IOException
-    {
-        this.reader = SequenceFile.bufferedReader(sstable.getFilename(), 1024 * 1024);
-        this.sstable = sstable;
-        bufIn = new DataInputBuffer();
-        bufOut = new DataOutputBuffer();
-    }
-
-    public String getFileName()
-    {
-        return reader.getFileName();
-    }
-
-    public void close() throws IOException
-    {
-        reader.close();
-    }
-
-    public boolean isExhausted()
-    {
-        return exhausted;
-    }
-
-    public DataInputBuffer getBufIn()
-    {
-        return bufIn;
-    }
-
-    public String getKey()
-    {
-        return key;
-    }
-
-    public int compareTo(FileStruct f)
-    {
-        return sstable.getPartitioner().getDecoratedKeyComparator().compare(key, f.key);
-    }    
-
-    public void seekTo(String seekKey)
-    {
-        try
-        {
-            long position = sstable.getNearestPosition(seekKey);
-            if (position < 0)
-            {
-                exhausted = true;
-                return;
-            }
-            reader.seek(position);
-            advance();
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException("corrupt sstable", e);
-        }
-    }
-
-    /*
-     * Read the next key from the data file, skipping block indexes.
-     * Caller must check isExhausted after each call to see if further
-     * reads are valid.
-     * Do not mix with calls to the iterator interface (next/hasnext).
-     * @deprecated -- prefer the iterator interface.
-     */
-    public void advance() throws IOException
-    {
-        if (exhausted)
-        {
-            throw new IndexOutOfBoundsException();
-        }
-
-        bufOut.reset();
-        if (reader.isEOF())
-        {
-            reader.close();
-            exhausted = true;
-            return;
-        }
-
-        long bytesread = reader.next(bufOut);
-        if (bytesread == -1)
-        {
-            reader.close();
-            exhausted = true;
-            return;
-        }
-
-        bufIn.reset(bufOut.getData(), bufOut.getLength());
-        key = bufIn.readUTF();
-    }
-
-    public boolean hasNext()
-    {
-        if (iterator == null)
-            iterator = new FileStructIterator();
-        return iterator.hasNext();
-    }
-
-    /** do not mix with manual calls to advance(). */
-    public String next()
-    {
-        if (iterator == null)
-            iterator = new FileStructIterator();
-        return iterator.next();
-    }
-
-    public void remove()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    private class FileStructIterator extends AbstractIterator<String>
-    {
-        public FileStructIterator()
-        {
-            if (key == null)
-            {
-                if (!isExhausted())
-                {
-                    forward();
-                }
-            }
-        }
-
-        private void forward()
-        {
-            try
-            {
-                advance();
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-
-        protected String computeNext()
-        {
-            if (isExhausted())
-            {
-                return endOfData();
-            }
-            String oldKey = key;
-            forward();
-            return oldKey;
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.SSTableReader;
+
+import org.apache.log4j.Logger;
+import com.google.common.collect.AbstractIterator;
+
+
+public class FileStruct implements Comparable<FileStruct>, Iterator<String>
+{
+    private static Logger logger = Logger.getLogger(FileStruct.class);
+
+    private String key = null; // decorated!
+    private boolean exhausted = false;
+    private IFileReader reader;
+    private DataInputBuffer bufIn;
+    private DataOutputBuffer bufOut;
+    private SSTableReader sstable;
+    private FileStructIterator iterator;
+
+    FileStruct(SSTableReader sstable) throws IOException
+    {
+        this.reader = SequenceFile.bufferedReader(sstable.getFilename(), 1024 * 1024);
+        this.sstable = sstable;
+        bufIn = new DataInputBuffer();
+        bufOut = new DataOutputBuffer();
+    }
+
+    public String getFileName()
+    {
+        return reader.getFileName();
+    }
+
+    public void close() throws IOException
+    {
+        reader.close();
+    }
+
+    public boolean isExhausted()
+    {
+        return exhausted;
+    }
+
+    public DataInputBuffer getBufIn()
+    {
+        return bufIn;
+    }
+
+    public String getKey()
+    {
+        return key;
+    }
+
+    public int compareTo(FileStruct f)
+    {
+        return sstable.getPartitioner().getDecoratedKeyComparator().compare(key, f.key);
+    }    
+
+    public void seekTo(String seekKey)
+    {
+        try
+        {
+            long position = sstable.getNearestPosition(seekKey);
+            if (position < 0)
+            {
+                exhausted = true;
+                return;
+            }
+            reader.seek(position);
+            advance();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("corrupt sstable", e);
+        }
+    }
+
+    /*
+     * Read the next key from the data file, skipping block indexes.
+     * Caller must check isExhausted after each call to see if further
+     * reads are valid.
+     * Do not mix with calls to the iterator interface (next/hasnext).
+     * @deprecated -- prefer the iterator interface.
+     */
+    public void advance() throws IOException
+    {
+        if (exhausted)
+        {
+            throw new IndexOutOfBoundsException();
+        }
+
+        bufOut.reset();
+        if (reader.isEOF())
+        {
+            reader.close();
+            exhausted = true;
+            return;
+        }
+
+        long bytesread = reader.next(bufOut);
+        if (bytesread == -1)
+        {
+            reader.close();
+            exhausted = true;
+            return;
+        }
+
+        bufIn.reset(bufOut.getData(), bufOut.getLength());
+        key = bufIn.readUTF();
+    }
+
+    public boolean hasNext()
+    {
+        if (iterator == null)
+            iterator = new FileStructIterator();
+        return iterator.hasNext();
+    }
+
+    /** do not mix with manual calls to advance(). */
+    public String next()
+    {
+        if (iterator == null)
+            iterator = new FileStructIterator();
+        return iterator.next();
+    }
+
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    private class FileStructIterator extends AbstractIterator<String>
+    {
+        public FileStructIterator()
+        {
+            if (key == null)
+            {
+                if (!isExhausted())
+                {
+                    forward();
+                }
+            }
+        }
+
+        private void forward()
+        {
+            try
+            {
+                advance();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        protected String computeNext()
+        {
+            if (isExhausted())
+            {
+                return endOfData();
+            }
+            String oldKey = key;
+            forward();
+            return oldKey;
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactSerializer.java Thu Jul 30 15:30:21 2009
@@ -1,48 +1,48 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io;
-
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.DataInputStream;
-
-/**
- * Allows for the controlled serialization/deserialization of a given type.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public interface ICompactSerializer<T>
-{
-	/**
-     * Serialize the specified type into the specified DataOutputStream instance.
-     * @param t type that needs to be serialized
-     * @param dos DataOutput into which serialization needs to happen.
-     * @throws IOException
-     */
-    public void serialize(T t, DataOutputStream dos) throws IOException;
-
-    /**
-     * Deserialize into the specified DataInputStream instance.
-     * @param dis DataInput from which deserialization needs to happen.
-     * @throws IOException
-     * @return the type that was deserialized
-     */
-    public T deserialize(DataInputStream dis) throws IOException;
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.DataInputStream;
+
+/**
+ * Allows for the controlled serialization/deserialization of a given type.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface ICompactSerializer<T>
+{
+	/**
+     * Serialize the specified type into the specified DataOutputStream instance.
+     * @param t type that needs to be serialized
+     * @param dos DataOutput into which serialization needs to happen.
+     * @throws IOException
+     */
+    public void serialize(T t, DataOutputStream dos) throws IOException;
+
+    /**
+     * Deserialize into the specified DataInputStream instance.
+     * @param dis DataInput from which deserialization needs to happen.
+     * @throws IOException
+     * @return the type that was deserialized
+     */
+    public T deserialize(DataInputStream dis) throws IOException;
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IFileReader.java Thu Jul 30 15:30:21 2009
@@ -1,83 +1,83 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.io;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.SortedSet;
-
-/**
- * Interface to read from the SequenceFile abstraction.
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public interface IFileReader
-{
-    public String getFileName();
-    public long getEOF() throws IOException;
-    public long getCurrentPosition() throws IOException;
-    public boolean isHealthyFileDescriptor() throws IOException;
-    public void seek(long position) throws IOException;
-    public boolean isEOF() throws IOException;
-
-    /**
-     * Be extremely careful while using this API. This currently
-     * used to read the commit log header from the commit logs.
-     * Treat this as an internal API.
-     * 
-     * @param bytes read into this byte array.
-    */
-    public void readDirect(byte[] bytes) throws IOException;
-    
-    /**
-     * Read a long value from the underlying sub system.
-     * @return value read
-     * @throws IOException
-     */
-    public long readLong() throws IOException;
-        
-    /**
-     * This method dumps the next key/value into the DataOuputStream
-     * passed in.
-     *
-     * @param bufOut DataOutputStream that needs to be filled.
-     * @return number of bytes read.
-     * @throws IOException 
-    */
-    public long next(DataOutputBuffer bufOut) throws IOException;
-
-    /**
-     * This method dumps the next key/value into the DataOuputStream
-     * passed in. Always use this method to query for application
-     * specific data as it will have indexes.
-     *
-     * @param key - key we are interested in.
-     * @param bufOut - DataOutputStream that needs to be filled.
-     * @param columnFamilyName The name of the column family only without the ":"
-     * @param columnNames - The list of columns in the cfName column family
-     * 					     that we want to return
-    */
-    public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, SortedSet<byte[]> columnNames, long position) throws IOException;
-
-    /**
-     * Close the file after reading.
-     * @throws IOException
-     */
-    public void close() throws IOException;
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.SortedSet;
+
+/**
+ * Interface to read from the SequenceFile abstraction.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFileReader
+{
+    public String getFileName();
+    public long getEOF() throws IOException;
+    public long getCurrentPosition() throws IOException;
+    public boolean isHealthyFileDescriptor() throws IOException;
+    public void seek(long position) throws IOException;
+    public boolean isEOF() throws IOException;
+
+    /**
+     * Be extremely careful while using this API. This currently
+     * used to read the commit log header from the commit logs.
+     * Treat this as an internal API.
+     * 
+     * @param bytes read into this byte array.
+    */
+    public void readDirect(byte[] bytes) throws IOException;
+    
+    /**
+     * Read a long value from the underlying sub system.
+     * @return value read
+     * @throws IOException
+     */
+    public long readLong() throws IOException;
+        
+    /**
+     * This method dumps the next key/value into the DataOuputStream
+     * passed in.
+     *
+     * @param bufOut DataOutputStream that needs to be filled.
+     * @return number of bytes read.
+     * @throws IOException 
+    */
+    public long next(DataOutputBuffer bufOut) throws IOException;
+
+    /**
+     * This method dumps the next key/value into the DataOuputStream
+     * passed in. Always use this method to query for application
+     * specific data as it will have indexes.
+     *
+     * @param key - key we are interested in.
+     * @param bufOut - DataOutputStream that needs to be filled.
+     * @param columnFamilyName The name of the column family only without the ":"
+     * @param columnNames - The list of columns in the cfName column family
+     * 					     that we want to return
+    */
+    public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, SortedSet<byte[]> columnNames, long position) throws IOException;
+
+    /**
+     * Close the file after reading.
+     * @throws IOException
+     */
+    public void close() throws IOException;
+}