You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:12:49 UTC
svn commit: r749205 [14/16] - in
/incubator/cassandra/src/org/apache/cassandra: analytics/ cli/ concurrent/
config/ continuations/ cql/ cql/common/ cql/compiler/ cql/compiler/common/
cql/compiler/parse/ cql/compiler/sem/ cql/driver/ cql/execution/ dht/...
Added: incubator/cassandra/src/org/apache/cassandra/io/DataOutputBuffer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/DataOutputBuffer.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/DataOutputBuffer.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/DataOutputBuffer.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+
+import org.apache.cassandra.continuations.Suspendable;
+
+
+/**
+ * An implementation of the DataOutputStream interface. This class is completely thread
+ * unsafe.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class DataOutputBuffer extends DataOutputStream
+{
+ /*
+ * This is a clone of the ByteArrayOutputStream but w/o the unnecessary
+ * synchronized keyword usage.
+ */
+ public static class FastByteArrayOutputStream extends OutputStream
+ {
+
+ /**
+ * The buffer where data is stored.
+ */
+ protected byte buf[];
+
+ /**
+ * The number of valid bytes in the buffer.
+ */
+ protected int count;
+
+ /**
+ * Creates a new byte array output stream. The buffer capacity is
+ * initially 32 bytes, though its size increases if necessary.
+ */
+ public FastByteArrayOutputStream()
+ {
+ this(32);
+ }
+
+ /**
+ * Creates a new byte array output stream, with a buffer capacity of the
+ * specified size, in bytes.
+ *
+ * @param size
+ * the initial size.
+ * @exception IllegalArgumentException
+ * if size is negative.
+ */
+ public FastByteArrayOutputStream(int size)
+ {
+ if (size < 0)
+ {
+ throw new IllegalArgumentException("Negative initial size: "
+ + size);
+ }
+ buf = new byte[size];
+ }
+
+ /**
+ * Writes the specified byte to this byte array output stream.
+ *
+ * @param b
+ * the byte to be written.
+ */
+ public void write(int b)
+ {
+ int newcount = count + 1;
+ if (newcount > buf.length)
+ {
+ buf = Arrays.copyOf(buf, Math.max(buf.length << 1, newcount));
+ }
+ buf[count] = (byte) b;
+ count = newcount;
+ }
+
+ /**
+ * Writes <code>len</code> bytes from the specified byte array
+ * starting at offset <code>off</code> to this byte array output
+ * stream.
+ *
+ * @param b
+ * the data.
+ * @param off
+ * the start offset in the data.
+ * @param len
+ * the number of bytes to write.
+ */
+ public void write(byte b[], int off, int len)
+ {
+ if ((off < 0) || (off > b.length) || (len < 0)
+ || ((off + len) > b.length) || ((off + len) < 0))
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ else if (len == 0)
+ {
+ return;
+ }
+ int newcount = count + len;
+ if (newcount > buf.length)
+ {
+ buf = Arrays.copyOf(buf, Math.max(buf.length << 1, newcount));
+ }
+ System.arraycopy(b, off, buf, count, len);
+ count = newcount;
+ }
+
+ /**
+ * Writes the complete contents of this byte array output stream to the
+ * specified output stream argument, as if by calling the output
+ * stream's write method using <code>out.write(buf, 0, count)</code>.
+ *
+ * @param out
+ * the output stream to which to write the data.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void writeTo(OutputStream out) throws IOException
+ {
+ out.write(buf, 0, count);
+ }
+
+ /**
+ * Resets the <code>count</code> field of this byte array output
+ * stream to zero, so that all currently accumulated output in the
+ * output stream is discarded. The output stream can be used again,
+ * reusing the already allocated buffer space.
+ *
+ * @see java.io.ByteArrayInputStream#count
+ */
+ public void reset()
+ {
+ count = 0;
+ }
+
+ /**
+ * Creates a newly allocated byte array. Its size is the current size of
+ * this output stream and the valid contents of the buffer have been
+ * copied into it.
+ *
+ * @return the current contents of this output stream, as a byte array.
+ * @see java.io.ByteArrayOutputStream#size()
+ */
+ public byte toByteArray()[]
+ {
+ return Arrays.copyOf(buf, count);
+ }
+
+ /**
+ * Returns the current size of the buffer.
+ *
+ * @return the value of the <code>count</code> field, which is the
+ * number of valid bytes in this output stream.
+ * @see java.io.ByteArrayOutputStream#count
+ */
+ public int size()
+ {
+ return count;
+ }
+
+ /**
+ * Converts the buffer's contents into a string decoding bytes using the
+ * platform's default character set. The length of the new
+ * <tt>String</tt> is a function of the character set, and hence may
+ * not be equal to the size of the buffer.
+ *
+ * <p>
+ * This method always replaces malformed-input and unmappable-character
+ * sequences with the default replacement string for the platform's
+ * default character set. The
+ * {@linkplain java.nio.charset.CharsetDecoder} class should be used
+ * when more control over the decoding process is required.
+ *
+ * @return String decoded from the buffer's contents.
+ * @since JDK1.1
+ */
+ public String toString()
+ {
+ return new String(buf, 0, count);
+ }
+
+ /**
+ * Converts the buffer's contents into a string by decoding the bytes
+ * using the specified {@link java.nio.charset.Charset charsetName}.
+ * The length of the new <tt>String</tt> is a function of the charset,
+ * and hence may not be equal to the length of the byte array.
+ *
+ * <p>
+ * This method always replaces malformed-input and unmappable-character
+ * sequences with this charset's default replacement string. The {@link
+ * java.nio.charset.CharsetDecoder} class should be used when more
+ * control over the decoding process is required.
+ *
+ * @param charsetName
+ * the name of a supported
+ * {@linkplain java.nio.charset.Charset </code>charset<code>}
+ * @return String decoded from the buffer's contents.
+ * @exception UnsupportedEncodingException
+ * If the named charset is not supported
+ * @since JDK1.1
+ */
+ public String toString(String charsetName) throws UnsupportedEncodingException
+ {
+ return new String(buf, 0, count, charsetName);
+ }
+
+ /**
+ * Creates a newly allocated string. Its size is the current size of the
+ * output stream and the valid contents of the buffer have been copied
+ * into it. Each character <i>c</i> in the resulting string is
+ * constructed from the corresponding element <i>b</i> in the byte
+ * array such that: <blockquote>
+ *
+ * <pre>
+ * c == (char) (((hibyte & 0xff) << 8) | (b & 0xff))
+ * </pre>
+ *
+ * </blockquote>
+ *
+ * @deprecated This method does not properly convert bytes into
+ * characters. As of JDK 1.1, the preferred way to do
+ * this is via the <code>toString(String enc)</code>
+ * method, which takes an encoding-name argument, or the
+ * <code>toString()</code> method, which uses the
+ * platform's default character encoding.
+ *
+ * @param hibyte
+ * the high byte of each resulting Unicode character.
+ * @return the current contents of the output stream, as a string.
+ * @see java.io.ByteArrayOutputStream#size()
+ * @see java.io.ByteArrayOutputStream#toString(String)
+ * @see java.io.ByteArrayOutputStream#toString()
+ */
+ @Deprecated
+ public String toString(int hibyte)
+ {
+ return new String(buf, hibyte, 0, count);
+ }
+
+ /**
+ * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods
+ * in this class can be called after the stream has been closed without
+ * generating an <tt>IOException</tt>.
+ * <p>
+ *
+ */
+ public void close() throws IOException
+ {
+ }
+
+ }
+
+ private static class Buffer extends FastByteArrayOutputStream
+ {
+ public byte[] getData()
+ {
+ return buf;
+ }
+
+ public int getLength()
+ {
+ return count;
+ }
+
+ public void reset()
+ {
+ count = 0;
+ }
+
+ public void write(DataInput in, int len) throws IOException
+ {
+ int newcount = count + len;
+ if (newcount > buf.length)
+ {
+ byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+ System.arraycopy(buf, 0, newbuf, 0, count);
+ buf = newbuf;
+ }
+ long start = System.currentTimeMillis();
+ in.readFully(buf, count, len);
+ count = newcount;
+ }
+
+ public void write(ByteBuffer buffer, int len) throws IOException
+ {
+ int newcount = count + len;
+ if (newcount > buf.length)
+ {
+ byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+ System.arraycopy(buf, 0, newbuf, 0, count);
+ buf = newbuf;
+ }
+ long start = System.currentTimeMillis();
+ buffer.get(buf, count, len);
+ count = newcount;
+ }
+ }
+
+ private Buffer buffer;
+
+ /** Constructs a new empty buffer. */
+ public DataOutputBuffer()
+ {
+ this(new Buffer());
+ }
+
+ private DataOutputBuffer(Buffer buffer)
+ {
+ super(buffer);
+ this.buffer = buffer;
+ }
+
+ /**
+ * Returns the current contents of the buffer. Data is only valid to
+ * {@link #getLength()}.
+ */
+ public byte[] getData()
+ {
+ return buffer.getData();
+ }
+
+ /** Returns the length of the valid data currently in the buffer. */
+ public int getLength()
+ {
+ return buffer.getLength();
+ }
+
+ /** Resets the buffer to empty. */
+ public DataOutputBuffer reset()
+ {
+ this.written = 0;
+ buffer.reset();
+ return this;
+ }
+
+ /** Writes bytes from a DataInput directly into the buffer. */
+ public void write(DataInput in, int length) throws IOException
+ {
+ buffer.write(in, length);
+ }
+
+ /** Writes bytes from a ByteBuffer directly into the buffer. */
+ public void write(ByteBuffer in, int length) throws IOException
+ {
+ buffer.write(in, length);
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/io/FastBufferedInputStream.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/FastBufferedInputStream.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/FastBufferedInputStream.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/FastBufferedInputStream.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * A <code>BufferedInputStream</code> adds functionality to another input
+ * stream-namely, the ability to buffer the input and to support the
+ * <code>mark</code> and <code>reset</code> methods. When the
+ * <code>BufferedInputStream</code> is created, an internal buffer array is
+ * created. As bytes from the stream are read or skipped, the internal buffer is
+ * refilled as necessary from the contained input stream, many bytes at a time.
+ * The <code>mark</code> operation remembers a point in the input stream and
+ * the <code>reset</code> operation causes all the bytes read since the most
+ * recent <code>mark</code> operation to be reread before new bytes are taken
+ * from the contained input stream.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class FastBufferedInputStream extends FilterInputStream
+{
+
+ private static int defaultBufferSize = 8192;
+
+ /**
+ * The internal buffer array where the data is stored. When necessary, it
+ * may be replaced by another array of a different size.
+ */
+ protected volatile byte buf[];
+
+ /**
+ * Atomic updater to provide compareAndSet for buf. This is necessary
+ * because closes can be asynchronous. We use nullness of buf[] as primary
+ * indicator that this stream is closed. (The "in" field is also nulled out
+ * on close.)
+ */
+ private static final AtomicReferenceFieldUpdater<FastBufferedInputStream, byte[]> bufUpdater = AtomicReferenceFieldUpdater
+ .newUpdater(FastBufferedInputStream.class, byte[].class, "buf");
+
+ /**
+ * The index one greater than the index of the last valid byte in the
+ * buffer. This value is always in the range <code>0</code> through
+ * <code>buf.length</code>; elements <code>buf[0]</code> through
+ * <code>buf[count-1]
+ * </code>contain buffered input data obtained from the
+ * underlying input stream.
+ */
+ protected int count;
+
+ /**
+ * The current position in the buffer. This is the index of the next
+ * character to be read from the <code>buf</code> array.
+ * <p>
+ * This value is always in the range <code>0</code> through
+ * <code>count</code>. If it is less than <code>count</code>, then
+ * <code>buf[pos]</code> is the next byte to be supplied as input; if it
+ * is equal to <code>count</code>, then the next <code>read</code> or
+ * <code>skip</code> operation will require more bytes to be read from the
+ * contained input stream.
+ *
+ * @see java.io.BufferedInputStream#buf
+ */
+ protected int pos;
+
+ /**
+ * The value of the <code>pos</code> field at the time the last
+ * <code>mark</code> method was called.
+ * <p>
+ * This value is always in the range <code>-1</code> through
+ * <code>pos</code>. If there is no marked position in the input stream,
+ * this field is <code>-1</code>. If there is a marked position in the
+ * input stream, then <code>buf[markpos]</code> is the first byte to be
+ * supplied as input after a <code>reset</code> operation. If
+ * <code>markpos</code> is not <code>-1</code>, then all bytes from
+ * positions <code>buf[markpos]</code> through <code>buf[pos-1]</code>
+ * must remain in the buffer array (though they may be moved to another
+ * place in the buffer array, with suitable adjustments to the values of
+ * <code>count</code>, <code>pos</code>, and <code>markpos</code>);
+ * they may not be discarded unless and until the difference between
+ * <code>pos</code> and <code>markpos</code> exceeds
+ * <code>marklimit</code>.
+ *
+ * @see java.io.BufferedInputStream#mark(int)
+ * @see java.io.BufferedInputStream#pos
+ */
+ protected int markpos = -1;
+
+ /**
+ * The maximum read ahead allowed after a call to the <code>mark</code>
+ * method before subsequent calls to the <code>reset</code> method fail.
+ * Whenever the difference between <code>pos</code> and
+ * <code>markpos</code> exceeds <code>marklimit</code>, then the mark
+ * may be dropped by setting <code>markpos</code> to <code>-1</code>.
+ *
+ * @see java.io.BufferedInputStream#mark(int)
+ * @see java.io.BufferedInputStream#reset()
+ */
+ protected int marklimit;
+
+ /**
+ * Check to make sure that underlying input stream has not been nulled out
+ * due to close; if not return it;
+ */
+ private InputStream getInIfOpen() throws IOException
+ {
+ InputStream input = in;
+ if (input == null)
+ throw new IOException("Stream closed");
+ return input;
+ }
+
+ /**
+ * Check to make sure that buffer has not been nulled out due to close; if
+ * not return it;
+ */
+ private byte[] getBufIfOpen() throws IOException
+ {
+ byte[] buffer = buf;
+ if (buffer == null)
+ throw new IOException("Stream closed");
+ return buffer;
+ }
+
+ /**
+ * Creates a <code>BufferedInputStream</code> and saves its argument, the
+ * input stream <code>in</code>, for later use. An internal buffer array
+ * is created and stored in <code>buf</code>.
+ *
+ * @param in
+ * the underlying input stream.
+ */
+ public FastBufferedInputStream(InputStream in)
+ {
+ this(in, defaultBufferSize);
+ }
+
+ /**
+ * Creates a <code>BufferedInputStream</code> with the specified buffer
+ * size, and saves its argument, the input stream <code>in</code>, for
+ * later use. An internal buffer array of length <code>size</code> is
+ * created and stored in <code>buf</code>.
+ *
+ * @param in
+ * the underlying input stream.
+ * @param size
+ * the buffer size.
+ * @exception IllegalArgumentException
+ * if size <= 0.
+ */
+ public FastBufferedInputStream(InputStream in, int size)
+ {
+ super(in);
+ if (size <= 0)
+ {
+ throw new IllegalArgumentException("Buffer size <= 0");
+ }
+ buf = new byte[size];
+ }
+
+ /**
+ * Fills the buffer with more data, taking into account shuffling and other
+ * tricks for dealing with marks. Assumes that it is being called by a
+ * synchronized method. This method also assumes that all data has already
+ * been read in, hence pos > count.
+ */
+ private void fill() throws IOException
+ {
+ byte[] buffer = getBufIfOpen();
+ if (markpos < 0)
+ pos = 0; /* no mark: throw away the buffer */
+ else if (pos >= buffer.length) /* no room left in buffer */
+ if (markpos > 0)
+ { /* can throw away early part of the buffer */
+ int sz = pos - markpos;
+ System.arraycopy(buffer, markpos, buffer, 0, sz);
+ pos = sz;
+ markpos = 0;
+ }
+ else if (buffer.length >= marklimit)
+ {
+ markpos = -1; /* buffer got too big, invalidate mark */
+ pos = 0; /* drop buffer contents */
+ }
+ else
+ { /* grow buffer */
+ int nsz = pos * 2;
+ if (nsz > marklimit)
+ nsz = marklimit;
+ byte nbuf[] = new byte[nsz];
+ System.arraycopy(buffer, 0, nbuf, 0, pos);
+ if (!bufUpdater.compareAndSet(this, buffer, nbuf))
+ {
+ // Can't replace buf if there was an async close.
+ // Note: This would need to be changed if fill()
+ // is ever made accessible to multiple threads.
+ // But for now, the only way CAS can fail is via close.
+ // assert buf == null;
+ throw new IOException("Stream closed");
+ }
+ buffer = nbuf;
+ }
+ count = pos;
+ int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
+ if (n > 0)
+ count = n + pos;
+ }
+
+ /**
+ * See the general contract of the <code>read</code> method of
+ * <code>InputStream</code>.
+ *
+ * @return the next byte of data, or <code>-1</code> if the end of the
+ * stream is reached.
+ * @exception IOException
+ * if this input stream has been closed by invoking its
+ * {@link #close()} method, or an I/O error occurs.
+ * @see java.io.FilterInputStream#in
+ */
+ public int read() throws IOException
+ {
+ if (pos >= count)
+ {
+ fill();
+ if (pos >= count)
+ return -1;
+ }
+ return getBufIfOpen()[pos++] & 0xff;
+ }
+
+ /**
+ * Read characters into a portion of an array, reading from the underlying
+ * stream at most once if necessary.
+ */
+ private int read1(byte[] b, int off, int len) throws IOException
+ {
+ int avail = count - pos;
+ if (avail <= 0)
+ {
+ /*
+ * If the requested length is at least as large as the buffer, and
+ * if there is no mark/reset activity, do not bother to copy the
+ * bytes into the local buffer. In this way buffered streams will
+ * cascade harmlessly.
+ */
+ if (len >= getBufIfOpen().length && markpos < 0)
+ {
+ return getInIfOpen().read(b, off, len);
+ }
+ fill();
+ avail = count - pos;
+ if (avail <= 0)
+ return -1;
+ }
+ int cnt = (avail < len) ? avail : len;
+ System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
+ pos += cnt;
+ return cnt;
+ }
+
+ /**
+ * Reads bytes from this byte-input stream into the specified byte array,
+ * starting at the given offset.
+ *
+ * <p>
+ * This method implements the general contract of the corresponding
+ * <code>{@link InputStream#read(byte[], int, int) read}</code> method of
+ * the <code>{@link InputStream}</code> class. As an additional
+ * convenience, it attempts to read as many bytes as possible by repeatedly
+ * invoking the <code>read</code> method of the underlying stream. This
+ * iterated <code>read</code> continues until one of the following
+ * conditions becomes true:
+ * <ul>
+ *
+ * <li> The specified number of bytes have been read,
+ *
+ * <li> The <code>read</code> method of the underlying stream returns
+ * <code>-1</code>, indicating end-of-file, or
+ *
+ * <li> The <code>available</code> method of the underlying stream returns
+ * zero, indicating that further input requests would block.
+ *
+ * </ul>
+ * If the first <code>read</code> on the underlying stream returns
+ * <code>-1</code> to indicate end-of-file then this method returns
+ * <code>-1</code>. Otherwise this method returns the number of bytes
+ * actually read.
+ *
+ * <p>
+ * Subclasses of this class are encouraged, but not required, to attempt to
+ * read as many bytes as possible in the same fashion.
+ *
+ * @param b
+ * destination buffer.
+ * @param off
+ * offset at which to start storing bytes.
+ * @param len
+ * maximum number of bytes to read.
+ * @return the number of bytes read, or <code>-1</code> if the end of the
+ * stream has been reached.
+ * @exception IOException
+ * if this input stream has been closed by invoking its
+ * {@link #close()} method, or an I/O error occurs.
+ */
+ public int read(byte b[], int off, int len) throws IOException
+ {
+ getBufIfOpen(); // Check for closed stream
+ if ((off | len | (off + len) | (b.length - (off + len))) < 0)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ else if (len == 0)
+ {
+ return 0;
+ }
+
+ int n = 0;
+ for (;;)
+ {
+ int nread = read1(b, off + n, len - n);
+ if (nread <= 0)
+ return (n == 0) ? nread : n;
+ n += nread;
+ if (n >= len)
+ return n;
+ // if not closed but no bytes available, return
+ InputStream input = in;
+ if (input != null && input.available() <= 0)
+ return n;
+ }
+ }
+
+ /**
+ * See the general contract of the <code>skip</code> method of
+ * <code>InputStream</code>.
+ *
+ * @exception IOException
+ * if the stream does not support seek, or if this input
+ * stream has been closed by invoking its {@link #close()}
+ * method, or an I/O error occurs.
+ */
+ public long skip(long n) throws IOException
+ {
+ getBufIfOpen(); // Check for closed stream
+ if (n <= 0)
+ {
+ return 0;
+ }
+ long avail = count - pos;
+
+ if (avail <= 0)
+ {
+ // If no mark position set then don't keep in buffer
+ if (markpos < 0)
+ return getInIfOpen().skip(n);
+
+ // Fill in buffer to save bytes for reset
+ fill();
+ avail = count - pos;
+ if (avail <= 0)
+ return 0;
+ }
+
+ long skipped = (avail < n) ? avail : n;
+ pos += skipped;
+ return skipped;
+ }
+
+ /**
+ * Returns an estimate of the number of bytes that can be read (or skipped
+ * over) from this input stream without blocking by the next invocation of a
+ * method for this input stream. The next invocation might be the same
+ * thread or another thread. A single read or skip of this many bytes will
+ * not block, but may read or skip fewer bytes.
+ * <p>
+ * This method returns the sum of the number of bytes remaining to be read
+ * in the buffer (<code>count - pos</code>) and the result of
+ * calling the {@link java.io.FilterInputStream#in in}.available().
+ *
+ * @return an estimate of the number of bytes that can be read (or skipped
+ * over) from this input stream without blocking.
+ * @exception IOException
+ * if this input stream has been closed by invoking its
+ * {@link #close()} method, or an I/O error occurs.
+ */
+ public int available() throws IOException
+ {
+ return getInIfOpen().available() + (count - pos);
+ }
+
+ /**
+ * See the general contract of the <code>mark</code> method of
+ * <code>InputStream</code>.
+ *
+ * @param readlimit
+ * the maximum limit of bytes that can be read before the mark
+ * position becomes invalid.
+ * @see java.io.BufferedInputStream#reset()
+ */
+ public void mark(int readlimit)
+ {
+ marklimit = readlimit;
+ markpos = pos;
+ }
+
+ /**
+ * See the general contract of the <code>reset</code> method of
+ * <code>InputStream</code>.
+ * <p>
+ * If <code>markpos</code> is <code>-1</code> (no mark has been set or
+ * the mark has been invalidated), an <code>IOException</code> is thrown.
+ * Otherwise, <code>pos</code> is set equal to <code>markpos</code>.
+ *
+ * @exception IOException
+ * if this stream has not been marked or, if the mark has
+ * been invalidated, or the stream has been closed by
+ * invoking its {@link #close()} method, or an I/O error
+ * occurs.
+ * @see java.io.BufferedInputStream#mark(int)
+ */
+ public void reset() throws IOException
+ {
+ getBufIfOpen(); // Cause exception if closed
+ if (markpos < 0)
+ throw new IOException("Resetting to invalid mark");
+ pos = markpos;
+ }
+
+ /**
+ * Tests if this input stream supports the <code>mark</code> and
+ * <code>reset</code> methods. The <code>markSupported</code> method of
+ * <code>BufferedInputStream</code> returns <code>true</code>.
+ *
+ * @return a <code>boolean</code> indicating if this stream type supports
+ * the <code>mark</code> and <code>reset</code> methods.
+ * @see java.io.InputStream#mark(int)
+ * @see java.io.InputStream#reset()
+ */
+ public boolean markSupported()
+ {
+ return true;
+ }
+
+ /**
+ * Closes this input stream and releases any system resources associated
+ * with the stream. Once the stream has been closed, further read(),
+ * available(), reset(), or skip() invocations will throw an IOException.
+ * Closing a previously closed stream has no effect.
+ *
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void close() throws IOException
+ {
+ byte[] buffer;
+ while ((buffer = buf) != null)
+ {
+ if (bufUpdater.compareAndSet(this, buffer, null))
+ {
+ InputStream input = in;
+ in = null;
+ if (input != null)
+ input.close();
+ return;
+ }
+ // Else retry in case a new buf was CASed in fill()
+ }
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/io/FastBufferedOutputStream.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/FastBufferedOutputStream.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/FastBufferedOutputStream.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/FastBufferedOutputStream.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.*;
+
+/**
+ * The class implements a buffered output stream. By setting up such an output
+ * stream, an application can write bytes to the underlying output stream
+ * without necessarily causing a call to the underlying system for each byte
+ * written.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class FastBufferedOutputStream extends FilterOutputStream
+{
+ /**
+ * The internal buffer where data is stored.
+ */
+ protected byte buf[];
+
+ /**
+ * The number of valid bytes in the buffer. This value is always in the
+ * range <tt>0</tt> through <tt>buf.length</tt>; elements
+ * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid byte
+ * data.
+ */
+ protected int count;
+
+ /**
+ * Creates a new buffered output stream to write data to the specified
+ * underlying output stream.
+ *
+ * @param out
+ * the underlying output stream.
+ */
+ public FastBufferedOutputStream(OutputStream out)
+ {
+ this(out, 8192);
+ }
+
+ /**
+ * Creates a new buffered output stream to write data to the specified
+ * underlying output stream with the specified buffer size.
+ *
+ * @param out
+ * the underlying output stream.
+ * @param size
+ * the buffer size.
+ * @exception IllegalArgumentException
+ * if size <= 0.
+ */
+ public FastBufferedOutputStream(OutputStream out, int size)
+ {
+ super(out);
+ if (size <= 0)
+ {
+ throw new IllegalArgumentException("Buffer size <= 0");
+ }
+ buf = new byte[size];
+ }
+
+ /** Flush the internal buffer */
+ private void flushBuffer() throws IOException
+ {
+ if (count > 0)
+ {
+ out.write(buf, 0, count);
+ count = 0;
+ }
+ }
+
+ /**
+ * Writes the specified byte to this buffered output stream.
+ *
+ * @param b
+ * the byte to be written.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void write(int b) throws IOException
+ {
+ if (count >= buf.length)
+ {
+ flushBuffer();
+ }
+ buf[count++] = (byte) b;
+ }
+
+ /**
+ * Writes <code>len</code> bytes from the specified byte array starting at
+ * offset <code>off</code> to this buffered output stream.
+ *
+ * <p>
+ * Ordinarily this method stores bytes from the given array into this
+ * stream's buffer, flushing the buffer to the underlying output stream as
+ * needed. If the requested length is at least as large as this stream's
+ * buffer, however, then this method will flush the buffer and write the
+ * bytes directly to the underlying output stream. Thus redundant
+ * <code>BufferedOutputStream</code>s will not copy data unnecessarily.
+ *
+ * @param b
+ * the data.
+ * @param off
+ * the start offset in the data.
+ * @param len
+ * the number of bytes to write.
+ * @exception IOException
+ * if an I/O error occurs.
+ */
+ public void write(byte b[], int off, int len)
+ throws IOException
+ {
+ if (len >= buf.length)
+ {
+ /*
+ * If the request length exceeds the size of the output buffer,
+ * flush the output buffer and then write the data directly. In this
+ * way buffered streams will cascade harmlessly.
+ */
+ flushBuffer();
+ out.write(b, off, len);
+ return;
+ }
+ if (len > buf.length - count)
+ {
+ flushBuffer();
+ }
+ System.arraycopy(b, off, buf, count, len);
+ count += len;
+ }
+
+ /**
+ * Flushes this buffered output stream. This forces any buffered output
+ * bytes to be written out to the underlying output stream.
+ *
+ * @exception IOException
+ * if an I/O error occurs.
+ * @see java.io.FilterOutputStream#out
+ */
+ public void flush() throws IOException
+ {
+ flushBuffer();
+ out.flush();
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/io/ICompactSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/ICompactSerializer.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/ICompactSerializer.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/ICompactSerializer.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.DataInputStream;
+
+/**
+ * Allows for the controlled serialization/deserialization of a given type.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface ICompactSerializer<T>
+{
+ /**
+ * Serialize the specified type into the specified DataOutputStream instance.
+ * @param t type that needs to be serialized
+ * @param dos DataOutput into which serialization needs to happen.
+ * @throws IOException
+ */
+ public void serialize(T t, DataOutputStream dos) throws IOException;
+
+ /**
+ * Deserialize into the specified DataInputStream instance.
+ * @param dis DataInput from which deserialization needs to happen.
+ * @throws IOException
+ * @return the type that was deserialized
+ */
+ public T deserialize(DataInputStream dis) throws IOException;
+}
Added: incubator/cassandra/src/org/apache/cassandra/io/IFileReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/IFileReader.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/IFileReader.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/IFileReader.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.List;
+
+/**
+ * Interface to read from the SequenceFile abstraction.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFileReader
+{
+ public String getFileName();
+ public long getEOF() throws IOException;
+ public long getCurrentPosition() throws IOException;
+ public boolean isHealthyFileDescriptor() throws IOException;
+ public void seek(long position) throws IOException;
+ public boolean isEOF() throws IOException;
+
+ /**
+ * Be extremely careful while using this API. This currently
+ * used to read the commit log header from the commit logs.
+ * Treat this as an internal API.
+ *
+ * @param bytes read into this byte array.
+ */
+ public void readDirect(byte[] bytes) throws IOException;
+
+ /**
+ * Read a long value from the underlying sub system.
+ * @return value read
+ * @throws IOException
+ */
+ public long readLong() throws IOException;
+
+ /**
+ * This functions is used to help out subsequent reads
+ * on the specified key. It reads the keys prior to this
+ * one on disk so that the buffer cache is hot.
+ *
+ * @param key key for which we are performing the touch.
+ * @param fData true implies we fetch the data into buffer cache.
+ */
+ public long touch(String key , boolean fData) throws IOException;
+
+ /**
+ * This method helps is retrieving the offset of the specified
+ * key in the file using the block index.
+ *
+ * @param key key whose position we need in the block index.
+ */
+ public long getPositionFromBlockIndex(String key) throws IOException;
+
+ /**
+ * This method returns the position of the specified key and the
+ * size of its data segment from the block index.
+ *
+ * @param key key whose block metadata we are interested in.
+ * @return an instance of the block metadata for this key.
+ */
+ public SSTable.BlockMetadata getBlockMetadata(String key) throws IOException;
+
+ /**
+ * This method dumps the next key/value into the DataOuputStream
+ * passed in.
+ *
+ * @param dos - DataOutputStream that needs to be filled.
+ * @return number of bytes read.
+ * @throws IOException
+ */
+ public long next(DataOutputBuffer bufOut) throws IOException;
+
+ /**
+ * This method dumps the next key/value into the DataOuputStream
+ * passed in.
+ *
+ * @param key key we are interested in.
+ * @param dos DataOutputStream that needs to be filled.
+ * @param section region of the file that needs to be read
+ * @throws IOException
+ * @return the number of bytes read.
+ */
+ public long next(String key, DataOutputBuffer bufOut, Coordinate section) throws IOException;
+
+ /**
+ * This method dumps the next key/value into the DataOuputStream
+ * passed in.
+ *
+ * @param key key we are interested in.
+ * @param dos DataOutputStream that needs to be filled.
+ * @param column name of the column in our format.
+ * @param section region of the file that needs to be read
+ * @throws IOException
+ * @return number of bytes that were read.
+ */
+ public long next(String key, DataOutputBuffer bufOut, String column, Coordinate section) throws IOException;
+
+ /**
+ * This method dumps the next key/value into the DataOuputStream
+ * passed in. Always use this method to query for application
+ * specific data as it will have indexes.
+ *
+ * @param key - key we are interested in.
+ * @param dos - DataOutputStream that needs to be filled.
+ * @param cfName - The name of the column family only without the ":"
+ * @param columnNames - The list of columns in the cfName column family
+ * that we want to return
+ * @param section region of the file that needs to be read
+ * @throws IOException
+ * @return number of bytes read.
+ *
+ */
+ public long next(String key, DataOutputBuffer bufOut, String columnFamilyName, List<String> columnNames, Coordinate section) throws IOException;
+
+ /**
+ * This method dumps the next key/value into the DataOuputStream
+ * passed in.
+ *
+ * @param key key we are interested in.
+ * @param dos DataOutputStream that needs to be filled.
+ * @param column name of the column in our format.
+ * @param timeRange time range we are interested in.
+ * @param section region of the file that needs to be read
+ * @throws IOException
+ * @return number of bytes that were read.
+ */
+ public long next(String key, DataOutputBuffer bufOut, String column, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException;
+
+ /**
+ * Close the file after reading.
+ * @throws IOException
+ */
+ public void close() throws IOException;
+}
Added: incubator/cassandra/src/org/apache/cassandra/io/IFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/IFileWriter.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/IFileWriter.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/IFileWriter.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.cassandra.db.PrimaryKey;
+
+
+/**
+ * An interface for writing into the SequenceFile abstraction.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFileWriter
+{
+ /**
+ * Get the current position of the file pointer.
+ * @return current file pointer position
+ * @throws IOException
+ */
+ public long getCurrentPosition() throws IOException;
+
+ /**
+ * @return the last file modification time.
+ */
+ public long lastModified();
+
+ /**
+ * Seeks the file pointer to the specified position.
+ * @param position position within the file to seek to.
+ * @throws IOException
+ */
+ public void seek(long position) throws IOException;
+
+ /**
+ * Appends the buffer to the the underlying SequenceFile.
+ * @param buffer buffer which contains the serialized data.
+ * @throws IOException
+ */
+ public void append(DataOutputBuffer buffer) throws IOException;
+
+ /**
+ * Appends the key and the value to the the underlying SequenceFile.
+ * @param keyBuffer buffer which contains the serialized key.
+ * @param buffer buffer which contains the serialized data.
+ * @throws IOException
+ */
+ public void append(DataOutputBuffer keyBuffer, DataOutputBuffer buffer) throws IOException;
+
+ /**
+ * Appends the key and the value to the the underlying SequenceFile.
+ * @param key key associated with this peice of data.
+ * @param buffer buffer containing the serialized data.
+ * @throws IOException
+ */
+ public void append(String key, DataOutputBuffer buffer) throws IOException;
+
+ /**
+ * Appends the key and the value to the the underlying SequenceFile.
+ * @param key key associated with this peice of data.
+ * @param value byte array containing the serialized data.
+ * @throws IOException
+ */
+ public void append(String key, byte[] value) throws IOException;
+
+ /**
+ * Appends the key and the long value to the the underlying SequenceFile.
+ * This is used in the contruction of the index file associated with a
+ * SSTable.
+ * @param key key associated with this peice of data.
+ * @param value value associated with this key.
+ * @throws IOException
+ */
+ public void append(String key, long value) throws IOException;
+
+ /**
+ * Be extremely careful while using this API. This currently
+ * used to write the commit log header in the commit logs.
+ * If not used carefully it could completely screw up reads
+ * of other key/value pairs that are written.
+ *
+ * @param bytes serialized version of the commit log header.
+ * @throws IOException
+ */
+ public long writeDirect(byte[] bytes) throws IOException;
+
+ /**
+ * Write a long into the underlying sub system.
+ * @param value long to be written
+ * @throws IOException
+ */
+ public void writeLong(long value) throws IOException;
+
+ /**
+ * Close the file which is being used for the write.
+ * @throws IOException
+ */
+ public void close() throws IOException;
+
+ /**
+ * Close the file after appending the passed in footer information.
+ * @param footer footer information.
+ * @param size size of the footer.
+ * @throws IOException
+ */
+ public void close(byte[] footer, int size) throws IOException;
+
+ /**
+ * @return the name of the file.
+ */
+ public String getFileName();
+
+ /**
+ * @return the size of the file.
+ * @throws IOException
+ */
+ public long getFileSize() throws IOException;
+}
Added: incubator/cassandra/src/org/apache/cassandra/io/IndexHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/io/IndexHelper.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/io/IndexHelper.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/io/IndexHelper.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,595 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.TypeInfo;
+import org.apache.cassandra.io.SSTable.KeyPositionInfo;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
+
+
+/**
+ * Provides helper to serialize, deserialize and use column indexes.
+ * Author : Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+
+public class IndexHelper
+{
+ /**
+ * Serializes a column index to a data output stream
+ * @param indexSizeInBytes Size of index to be written
+ * @param columnIndexList List of column index entries as objects
+ * @param dos the output stream into which the column index is to be written
+ * @throws IOException
+ */
+ public static void serialize(int indexSizeInBytes, List<ColumnIndexInfo> columnIndexList, DataOutputStream dos) throws IOException
+ {
+ /* if we have no data to index, the write that there is no index present */
+ if(indexSizeInBytes == 0 || columnIndexList == null || columnIndexList.size() == 0)
+ {
+ dos.writeBoolean(false);
+ }
+ else
+ {
+ /* write if we are storing a column index */
+ dos.writeBoolean(true);
+ /* write the size of the index */
+ dos.writeInt(indexSizeInBytes);
+ for( ColumnIndexInfo cIndexInfo : columnIndexList )
+ {
+ cIndexInfo.serialize(dos);
+ }
+ }
+ }
+
+ /**
+ * Skip the bloom filter and the index and return the bytes read.
+ * @param in the data input from which the bloom filter and index
+ * should be skipped
+ * @return number of bytes read.
+ * @throws IOException
+ */
+ public static int skipBloomFilterAndIndex(DataInput in) throws IOException
+ {
+ int totalBytesRead = 0;
+ /* size of the bloom filter */
+ int size = in.readInt();
+ totalBytesRead += 4;
+ /* skip the serialized bloom filter */
+ in.skipBytes(size);
+ totalBytesRead += size;
+ /* skip the index on disk */
+ /* read if the file has column indexes */
+ boolean hasColumnIndexes = in.readBoolean();
+ totalBytesRead += 1;
+ if ( hasColumnIndexes )
+ {
+ totalBytesRead += skipIndex(in);
+ }
+ return totalBytesRead;
+ }
+
+ /**
+ * Skip the bloom filter and return the bytes read.
+ * @param in the data input from which the bloom filter
+ * should be skipped
+ * @return number of bytes read.
+ * @throws IOException
+ */
+ public static int skipBloomFilter(DataInput in) throws IOException
+ {
+ int totalBytesRead = 0;
+ /* size of the bloom filter */
+ int size = in.readInt();
+ totalBytesRead += 4;
+ /* skip the serialized bloom filter */
+ in.skipBytes(size);
+ totalBytesRead += size;
+ return totalBytesRead;
+ }
+
+ /**
+ * Skip the index and return the number of bytes read.
+ * @param file the data input from which the index should be skipped
+ * @return number of bytes read from the data input
+ * @throws IOException
+ */
+ public static int skipIndex(DataInput file) throws IOException
+ {
+ /* read only the column index list */
+ int columnIndexSize = file.readInt();
+ int totalBytesRead = 4;
+
+ /* skip the column index data */
+ file.skipBytes(columnIndexSize);
+ totalBytesRead += columnIndexSize;
+
+ return totalBytesRead;
+ }
+
+ /**
+ * Deserialize the index into a structure and return the number of bytes read.
+ * @param in Input from which the serialized form of the index is read
+ * @param columnIndexList the structure which is filled in with the deserialized index
+ * @return number of bytes read from the input
+ * @throws IOException
+ */
+ static int deserializeIndex(String cfName, DataInput in, List<ColumnIndexInfo> columnIndexList) throws IOException
+ {
+ /* read only the column index list */
+ int columnIndexSize = in.readInt();
+ int totalBytesRead = 4;
+
+ /* read the indexes into a separate buffer */
+ DataOutputBuffer indexOut = new DataOutputBuffer();
+ /* write the data into buffer */
+ indexOut.write(in, columnIndexSize);
+ totalBytesRead += columnIndexSize;
+
+ /* now deserialize the index list */
+ DataInputBuffer indexIn = new DataInputBuffer();
+ indexIn.reset(indexOut.getData(), indexOut.getLength());
+ String columnName;
+ int position;
+ int numCols;
+
+ TypeInfo typeInfo = DatabaseDescriptor.getTypeInfo(cfName);
+ if ( DatabaseDescriptor.getColumnFamilyType(cfName).equals("Super") || DatabaseDescriptor.isNameSortingEnabled(cfName) )
+ {
+ typeInfo = TypeInfo.STRING;
+ }
+
+ while(indexIn.available() > 0)
+ {
+ ColumnIndexInfo cIndexInfo = ColumnIndexFactory.instance(typeInfo);
+ cIndexInfo = cIndexInfo.deserialize(indexIn);
+ columnIndexList.add(cIndexInfo);
+ }
+
+ return totalBytesRead;
+ }
+
+ /**
+ * Returns the range in which a given column falls in the index
+ * @param column The column whose range needs to be found
+ * @param columnIndexList the in-memory representation of the column index
+ * @param dataSize the total size of the data
+ * @param totalNumCols total number of columns
+ * @return an object describing a subrange in which the column is serialized
+ */
+ static ColumnRange getColumnRangeFromNameIndex(IndexHelper.ColumnIndexInfo cIndexInfo, List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int totalNumCols)
+ {
+ /* find the offset for the column */
+ int size = columnIndexList.size();
+ long start = 0;
+ long end = dataSize;
+ int numColumns = 0;
+
+ int index = Collections.binarySearch(columnIndexList, cIndexInfo);
+ if ( index < 0 )
+ {
+ /* We are here which means that the requested column is not an index. */
+ index = (++index)*(-1);
+ }
+ else
+ {
+ ++index;
+ }
+
+ /* calculate the starting offset from which we have to read */
+ start = (index == 0) ? 0 : columnIndexList.get(index - 1).position();
+
+ if( index < size )
+ {
+ end = columnIndexList.get(index).position();
+ numColumns = columnIndexList.get(index).count();
+ }
+ else
+ {
+ end = dataSize;
+ int totalColsIndexed = 0;
+ for( IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList )
+ {
+ totalColsIndexed += colPosInfo.count();
+ }
+ numColumns = totalNumCols - totalColsIndexed;
+ }
+
+ return new ColumnRange(start, end, numColumns);
+ }
+
+ /**
+ * Returns the sub-ranges that contain the list of columns in columnNames.
+ * @param columnNames The list of columns whose subranges need to be found
+ * @param columnIndexList the deserialized column indexes
+ * @param dataSize the total size of data
+ * @param totalNumCols the total number of columns
+ * @return a list of subranges which contain all the columns in columnNames
+ */
+ static List<ColumnRange> getMultiColumnRangesFromNameIndex(List<String> columnNames, List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int totalNumCols)
+ {
+ List<ColumnRange> columnRanges = new ArrayList<ColumnRange>();
+
+ if ( columnIndexList.size() == 0 )
+ {
+ columnRanges.add( new ColumnRange(0, dataSize, totalNumCols) );
+ }
+ else
+ {
+ Map<Long, Boolean> offset = new HashMap<Long, Boolean>();
+ for(String column : columnNames)
+ {
+ IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnNameIndexInfo(column);
+ ColumnRange columnRange = getColumnRangeFromNameIndex(cIndexInfo, columnIndexList, dataSize, totalNumCols);
+ if ( offset.get( columnRange.coordinate().start_ ) == null )
+ {
+ columnRanges.add(columnRange);
+ offset.put(columnRange.coordinate().start_, true);
+ }
+ }
+ }
+
+ return columnRanges;
+ }
+
+ /**
+ * Returns the range in which a given column falls in the index. This
+ * is used when time range queries are in play. For instance if we are
+ * looking for columns in the range [t, t2]
+ * @param cIndexInfo the time we are interested in.
+ * @param columnIndexList the in-memory representation of the column index
+ * @param dataSize the total size of the data
+ * @param totalNumCols total number of columns
+ * @return an object describing a subrange in which the column is serialized
+ */
+ static ColumnRange getColumnRangeFromTimeIndex(IndexHelper.TimeRange timeRange, List<IndexHelper.ColumnIndexInfo> columnIndexList, int dataSize, int totalNumCols)
+ {
+ /* if column indexes were not present for this column family, the handle accordingly */
+ if(columnIndexList.size() == 0)
+ {
+ return new ColumnRange(0, dataSize, totalNumCols);
+ }
+
+ /* find the offset for the column */
+ int size = columnIndexList.size();
+ long start = 0;
+ long end = dataSize;
+ int numColumns = 0;
+
+ /*
+ * Time indicies are sorted in descending order. So
+ * we need to apply a reverse compartor for the
+ * binary search.
+ */
+ Comparator<IndexHelper.ColumnIndexInfo> comparator = Collections.reverseOrder();
+ IndexHelper.ColumnIndexInfo rhs = IndexHelper.ColumnIndexFactory.instance(TypeInfo.LONG);
+ rhs.set(timeRange.rhs());
+ int index = Collections.binarySearch(columnIndexList, rhs, comparator);
+ if ( index < 0 )
+ {
+ /* We are here which means that the requested column is not an index. */
+ index = (++index)*(-1);
+ }
+ else
+ {
+ ++index;
+ }
+
+ /*
+ * Calculate the starting offset from which we have to read. So
+ * we achieve this by performing the probe using the bigger timestamp
+ * and then scanning the column position chunks till we reach the
+ * lower timestamp in the time range.
+ */
+ start = (index == 0) ? 0 : columnIndexList.get(index - 1).position();
+ /* add the number of colunms in the first chunk. */
+ numColumns += (index ==0) ? columnIndexList.get(0).count() : columnIndexList.get(index - 1).count();
+ if( index < size )
+ {
+ int chunks = columnIndexList.size();
+ /* Index info for the lower bound of the time range */
+ IndexHelper.ColumnIndexInfo lhs = IndexHelper.ColumnIndexFactory.instance(TypeInfo.LONG);
+ lhs.set(timeRange.lhs());
+ int i = index + 1;
+ for ( ; i < chunks; ++i )
+ {
+ IndexHelper.ColumnIndexInfo cIndexInfo2 = columnIndexList.get(i);
+ if ( cIndexInfo2.compareTo(lhs) < 0 )
+ {
+ numColumns += cIndexInfo2.count();
+ break;
+ }
+ numColumns += cIndexInfo2.count();
+ }
+
+ end = columnIndexList.get(i).position();
+ }
+ else
+ {
+ end = dataSize;
+ int totalColsIndexed = 0;
+ for( IndexHelper.ColumnIndexInfo colPosInfo : columnIndexList )
+ {
+ totalColsIndexed += colPosInfo.count();
+ }
+ numColumns = totalNumCols - totalColsIndexed;
+ }
+
+ return new ColumnRange(start, end, numColumns);
+ }
+
+ public static class ColumnIndexFactory
+ {
+ public static ColumnIndexInfo instance(TypeInfo typeInfo)
+ {
+ ColumnIndexInfo cIndexInfo = null;
+ switch(typeInfo)
+ {
+ case STRING:
+ cIndexInfo = new ColumnNameIndexInfo();
+ break;
+
+ case LONG:
+ cIndexInfo = new ColumnTimestampIndexInfo();
+ break;
+ }
+ return cIndexInfo;
+ }
+ }
+
+ /**
+ * Encapsulates a time range. Queries use
+ * this abstraction for indicating start
+ * and end regions of a time filter.
+ *
+ * @author alakshman
+ *
+ */
+ public static class TimeRange
+ {
+ private long lhs_;
+ private long rhs_;
+
+ public TimeRange(long lhs, long rhs)
+ {
+ lhs_ = lhs;
+ rhs_ = rhs;
+ }
+
+ public long lhs()
+ {
+ return lhs_;
+ }
+
+ public long rhs()
+ {
+ return rhs_;
+ }
+ }
+
+ /**
+ * A column range containing the start and end
+ * offset of the appropriate column index chunk
+ * and the number of columns in that chunk.
+ * @author alakshman
+ *
+ */
+ public static class ColumnRange
+ {
+ private Coordinate coordinate_;
+ private int columnCount_;
+
+ ColumnRange(long start, long end, int columnCount)
+ {
+ coordinate_ = new Coordinate(start, end);
+ columnCount_ = columnCount;
+ }
+
+ Coordinate coordinate()
+ {
+ return coordinate_;
+ }
+
+ int count()
+ {
+ return columnCount_;
+ }
+ }
+
+ /**
+ * A helper class to generate indexes while
+ * the columns are sorted by name on disk.
+ */
+ public static abstract class ColumnIndexInfo implements Comparable<ColumnIndexInfo>
+ {
+ private long position_;
+ private int columnCount_;
+
+ ColumnIndexInfo(long position, int columnCount)
+ {
+ position_ = position;
+ columnCount_ = columnCount;
+ }
+
+ public long position()
+ {
+ return position_;
+ }
+
+ public void position(long position)
+ {
+ position_ = position;
+ }
+
+ int count()
+ {
+ return columnCount_;
+ }
+
+ public void count(int count)
+ {
+ columnCount_ = count;
+ }
+
+ public abstract void set(Object o);
+ public abstract void serialize(DataOutputStream dos) throws IOException;
+ public abstract ColumnIndexInfo deserialize(DataInputStream dis) throws IOException;
+
+ public int size()
+ {
+ /* size of long for "position_" + size of columnCount_ */
+ return (8 + 4);
+ }
+ }
+
+ static class ColumnNameIndexInfo extends ColumnIndexInfo
+ {
+ private String name_;
+
+ ColumnNameIndexInfo()
+ {
+ super(0L, 0);
+ }
+
+ ColumnNameIndexInfo(String name)
+ {
+ this(name, 0L, 0);
+ }
+
+ ColumnNameIndexInfo(String name, long position, int columnCount)
+ {
+ super(position, columnCount);
+ name_ = name;
+ }
+
+ String name()
+ {
+ return name_;
+ }
+
+ public void set(Object o)
+ {
+ name_ = (String)o;
+ }
+
+ public int compareTo(ColumnIndexInfo rhs)
+ {
+ IndexHelper.ColumnNameIndexInfo cIndexInfo = (IndexHelper.ColumnNameIndexInfo)rhs;
+ return name_.compareTo(cIndexInfo.name_);
+ }
+
+ public void serialize(DataOutputStream dos) throws IOException
+ {
+ dos.writeLong(position());
+ dos.writeInt(count());
+ dos.writeUTF(name_);
+ }
+
+ public ColumnNameIndexInfo deserialize(DataInputStream dis) throws IOException
+ {
+ long position = dis.readLong();
+ int columnCount = dis.readInt();
+ String name = dis.readUTF();
+ return new ColumnNameIndexInfo(name, position, columnCount);
+ }
+
+ public int size()
+ {
+ int size = super.size();
+ /* Size of the name_ as an UTF8 and the actual length as a short for the readUTF. */
+ size += FBUtilities.getUTF8Length(name_) + IColumn.UtfPrefix_;
+ return size;
+ }
+ }
+
+ static class ColumnTimestampIndexInfo extends ColumnIndexInfo
+ {
+ private long timestamp_;
+
+ ColumnTimestampIndexInfo()
+ {
+ super(0L, 0);
+ }
+
+ ColumnTimestampIndexInfo(long timestamp)
+ {
+ this(timestamp, 0L, 0);
+ }
+
+ ColumnTimestampIndexInfo(long timestamp, long position, int columnCount)
+ {
+ super(position, columnCount);
+ timestamp_ = timestamp;
+ }
+
+ public long timestamp()
+ {
+ return timestamp_;
+ }
+
+ public void set(Object o)
+ {
+ timestamp_ = (Long)o;
+ }
+
+ public int compareTo(ColumnIndexInfo rhs)
+ {
+ ColumnTimestampIndexInfo cIndexInfo = (ColumnTimestampIndexInfo)rhs;
+ return Long.valueOf(timestamp_).compareTo(Long.valueOf(cIndexInfo.timestamp_));
+ }
+
+ public void serialize(DataOutputStream dos) throws IOException
+ {
+ dos.writeLong(position());
+ dos.writeInt(count());
+ dos.writeLong(timestamp_);
+ }
+
+ public ColumnTimestampIndexInfo deserialize(DataInputStream dis) throws IOException
+ {
+ long position = dis.readLong();
+ int columnCount = dis.readInt();
+ long timestamp = dis.readLong();
+ return new ColumnTimestampIndexInfo(timestamp, position, columnCount);
+ }
+
+ public int size()
+ {
+ int size = super.size();
+ /* add the size of the timestamp which is a long */
+ size += 8;
+ return size;
+ }
+ }
+}