You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/04/06 17:10:12 UTC
svn commit: r762376 -
/incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
Author: jbellis
Date: Mon Apr 6 15:10:12 2009
New Revision: 762376
URL: http://svn.apache.org/viewvc?rev=762376&view=rev
Log:
reformat code (mostly whitespace changes) to keep subsequent patches clean. patch by jbellis; reviewed by Jun Rau. see #52
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java?rev=762376&r1=762375&r2=762376&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java Mon Apr 6 15:10:12 2009
@@ -35,6 +35,7 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.LogUtil;
+
import org.apache.log4j.Logger;
/**
@@ -43,7 +44,7 @@
* jump to random positions to read data from the file. This class
* also has many implementations of the IFileWriter and IFileReader
* interfaces which are exposed through factory methods.
- *
+ * <p/>
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com ) & Karthik Ranganathan ( kranganathan@facebook.com )
*/
@@ -79,23 +80,23 @@
super(filename);
init(filename);
}
-
+
Writer(String filename, int size) throws IOException
{
super(filename);
init(filename, size);
}
-
+
protected void init(String filename) throws IOException
{
File file = new File(filename);
- if ( !file.exists() )
+ if (!file.exists())
{
file.createNewFile();
}
file_ = new RandomAccessFile(file, "rw");
}
-
+
protected void init(String filename, int size) throws IOException
{
init(filename);
@@ -115,11 +116,11 @@
{
file_.write(buffer.getData(), 0, buffer.getLength());
}
-
+
public void append(DataOutputBuffer keyBuffer, DataOutputBuffer buffer) throws IOException
{
int keyBufLength = keyBuffer.getLength();
- if ( keyBuffer == null || keyBufLength == 0 )
+ if (keyBuffer == null || keyBufLength == 0)
throw new IllegalArgumentException("Key cannot be NULL or of zero length.");
file_.seek(file_.getFilePointer());
@@ -133,7 +134,7 @@
public void append(String key, DataOutputBuffer buffer) throws IOException
{
- if ( key == null )
+ if (key == null)
throw new IllegalArgumentException("Key cannot be NULL.");
file_.seek(file_.getFilePointer());
@@ -145,7 +146,7 @@
public void append(String key, byte[] value) throws IOException
{
- if ( key == null )
+ if (key == null)
throw new IllegalArgumentException("Key cannot be NULL.");
file_.seek(file_.getFilePointer());
@@ -156,7 +157,7 @@
public void append(String key, long value) throws IOException
{
- if ( key == null )
+ if (key == null)
throw new IllegalArgumentException("Key cannot be NULL.");
file_.seek(file_.getFilePointer());
@@ -169,14 +170,15 @@
* used to write the commit log header in the commit logs.
* If not used carefully it could completely screw up reads
* of other key/value pairs that are written.
+ *
* @param bytes the bytes to write
- */
+ */
public long writeDirect(byte[] bytes) throws IOException
{
file_.write(bytes);
return file_.getFilePointer();
}
-
+
public void writeLong(long value) throws IOException
{
file_.writeLong(value);
@@ -191,7 +193,7 @@
{
file_.writeUTF(SequenceFile.marker_);
file_.writeInt(size);
- file_.write(footer, 0, size);
+ file_.write(footer, 0, size);
}
public String getFileName()
@@ -212,25 +214,25 @@
{
super(filename, size);
}
-
+
@Override
protected void init(String filename) throws IOException
{
init(filename, 0);
}
-
+
@Override
protected void init(String filename, int size) throws IOException
{
File file = new File(filename);
file_ = new BufferedRandomAccessFile(file, "rw", size);
- if ( !file.exists() )
+ if (!file.exists())
{
file.createNewFile();
}
}
}
-
+
public static class ChecksumWriter extends Writer
{
@@ -238,20 +240,20 @@
{
super(filename, size);
}
-
+
@Override
protected void init(String filename) throws IOException
{
init(filename, 0);
}
-
+
@Override
protected void init(String filename, int size) throws IOException
{
File file = new File(filename);
file_ = new ChecksumRandomAccessFile(file, "rw", size);
}
-
+
@Override
public void close() throws IOException
{
@@ -289,16 +291,16 @@
byteBuffer.flip();
fc_.write(byteBuffer);
}
-
+
public void append(DataOutputBuffer keyBuffer, DataOutputBuffer buffer) throws IOException
{
int keyBufLength = keyBuffer.getLength();
- if ( keyBuffer == null || keyBufLength == 0 )
+ if (keyBuffer == null || keyBufLength == 0)
throw new IllegalArgumentException("Key cannot be NULL or of zero length.");
/* Size allocated "int" for key length + key + "int" for data length + data */
int length = buffer.getLength();
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect( 4 + keyBufLength + 4 + length );
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4 + keyBufLength + 4 + length);
byteBuffer.putInt(keyBufLength);
byteBuffer.put(keyBuffer.getData(), 0, keyBufLength);
byteBuffer.putInt(length);
@@ -309,12 +311,12 @@
public void append(String key, DataOutputBuffer buffer) throws IOException
{
- if ( key == null )
+ if (key == null)
throw new IllegalArgumentException("Key cannot be NULL.");
int length = buffer.getLength();
/* Size allocated : utfPrefix_ + key length + "int" for data size + data */
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect( SequenceFile.utfPrefix_ + key.length() + 4 + length);
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(SequenceFile.utfPrefix_ + key.length() + 4 + length);
SequenceFile.writeUTF(byteBuffer, key);
byteBuffer.putInt(length);
byteBuffer.put(buffer.getData(), 0, length);
@@ -324,7 +326,7 @@
public void append(String key, byte[] value) throws IOException
{
- if ( key == null )
+ if (key == null)
throw new IllegalArgumentException("Key cannot be NULL.");
/* Size allocated key length + "int" for data size + data */
@@ -338,7 +340,7 @@
public void append(String key, long value) throws IOException
{
- if ( key == null )
+ if (key == null)
throw new IllegalArgumentException("Key cannot be NULL.");
/* Size allocated key length + a long */
@@ -363,7 +365,7 @@
fc_.write(byteBuffer);
return fc_.position();
}
-
+
public void writeLong(long value) throws IOException
{
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(8);
@@ -380,12 +382,12 @@
public void close(byte[] footer, int size) throws IOException
{
/* Size is marker length + "int" for size + footer data */
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect( utfPrefix_ + SequenceFile.marker_.length() + 4 + footer.length);
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(utfPrefix_ + SequenceFile.marker_.length() + 4 + footer.length);
SequenceFile.writeUTF(byteBuffer, SequenceFile.marker_);
byteBuffer.putInt(size);
byteBuffer.put(footer);
byteBuffer.flip();
- fc_.write(byteBuffer);
+ fc_.write(byteBuffer);
}
public String getFileName()
@@ -398,7 +400,7 @@
return fc_.size();
}
}
-
+
public static class FastConcurrentWriter extends AbstractWriter
{
private FileChannel fc_;
@@ -408,30 +410,30 @@
{
super(filename);
fc_ = new RandomAccessFile(filename, "rw").getChannel();
- buffer_ = fc_.map( FileChannel.MapMode.READ_WRITE, 0, size );
+ buffer_ = fc_.map(FileChannel.MapMode.READ_WRITE, 0, size);
buffer_.load();
}
void unmap(final Object buffer)
{
- AccessController.doPrivileged( new PrivilegedAction<MappedByteBuffer>()
- {
- public MappedByteBuffer run()
- {
- try
- {
- Method getCleanerMethod = buffer.getClass().getMethod("cleaner", new Class[0]);
- getCleanerMethod.setAccessible(true);
- sun.misc.Cleaner cleaner = (sun.misc.Cleaner)getCleanerMethod.invoke(buffer,new Object[0]);
- cleaner.clean();
- }
- catch(Throwable e)
- {
- logger_.warn( LogUtil.throwableToString(e) );
- }
- return null;
- }
- });
+ AccessController.doPrivileged(new PrivilegedAction<MappedByteBuffer>()
+ {
+ public MappedByteBuffer run()
+ {
+ try
+ {
+ Method getCleanerMethod = buffer.getClass().getMethod("cleaner", new Class[0]);
+ getCleanerMethod.setAccessible(true);
+ sun.misc.Cleaner cleaner = (sun.misc.Cleaner) getCleanerMethod.invoke(buffer, new Object[0]);
+ cleaner.clean();
+ }
+ catch (Throwable e)
+ {
+ logger_.warn(LogUtil.throwableToString(e));
+ }
+ return null;
+ }
+ });
}
@@ -442,18 +444,18 @@
public void seek(long position) throws IOException
{
- buffer_.position((int)position);
+ buffer_.position((int) position);
}
public void append(DataOutputBuffer buffer) throws IOException
{
buffer_.put(buffer.getData(), 0, buffer.getLength());
}
-
+
public void append(DataOutputBuffer keyBuffer, DataOutputBuffer buffer) throws IOException
{
int keyBufLength = keyBuffer.getLength();
- if ( keyBuffer == null || keyBufLength == 0 )
+ if (keyBuffer == null || keyBufLength == 0)
throw new IllegalArgumentException("Key cannot be NULL or of zero length.");
int length = buffer.getLength();
@@ -465,7 +467,7 @@
public void append(String key, DataOutputBuffer buffer) throws IOException
{
- if ( key == null )
+ if (key == null)
throw new IllegalArgumentException("Key cannot be NULL.");
int length = buffer.getLength();
@@ -476,7 +478,7 @@
public void append(String key, byte[] value) throws IOException
{
- if ( key == null )
+ if (key == null)
throw new IllegalArgumentException("Key cannot be NULL.");
SequenceFile.writeUTF(buffer_, key);
@@ -486,7 +488,7 @@
public void append(String key, long value) throws IOException
{
- if ( key == null )
+ if (key == null)
throw new IllegalArgumentException("Key cannot be NULL.");
SequenceFile.writeUTF(buffer_, key);
@@ -504,7 +506,7 @@
buffer_.put(bytes);
return buffer_.position();
}
-
+
public void writeLong(long value) throws IOException
{
buffer_.putLong(value);
@@ -540,7 +542,7 @@
public static abstract class AbstractReader implements IFileReader
{
private static final short utfPrefix_ = 2;
- protected RandomAccessFile file_;
+ protected RandomAccessFile file_;
protected String filename_;
AbstractReader(String filename)
@@ -551,38 +553,40 @@
public String getFileName()
{
return filename_;
- }
-
+ }
+
/**
* Given the application key this method basically figures if
* the key is in the block. Key comparisons differ based on the
* partition function. In OPHF key is stored as is but in the
* case of a Random hash key used internally is hash(key):key.
+ *
* @param key which we are looking for
- * @param in DataInput stream into which we are looking for the key.
+ * @param in DataInput stream into which we are looking for the key.
* @return true if key is found and false otherwise.
* @throws IOException
*/
protected boolean isKeyInBlock(String key, DataInput in) throws IOException
{
- boolean bVal = false;
+ boolean bVal = false;
String keyInBlock = in.readUTF();
PartitionerType pType = StorageService.getPartitionerType();
- switch ( pType )
+ switch (pType)
{
case OPHF:
bVal = keyInBlock.equals(key);
break;
-
- default:
+
+ default:
bVal = keyInBlock.split(":")[0].equals(key);
break;
}
return bVal;
}
-
+
/**
* Return the position of the given key from the block index.
+ *
* @param key the key whose offset is to be extracted from the current block index
*/
public long getPositionFromBlockIndex(String key) throws IOException
@@ -590,9 +594,9 @@
long position = -1L;
/* note the beginning of the block index */
long blockIndexPosition = file_.getFilePointer();
- /* read the block key. */
+ /* read the block key. */
String blockIndexKey = file_.readUTF();
- if ( !blockIndexKey.equals(SSTable.blockIndexKey_) )
+ if (!blockIndexKey.equals(SSTable.blockIndexKey_))
throw new IOException("Unexpected position to be reading the block index from.");
/* read the size of the block index */
int size = file_.readInt();
@@ -602,14 +606,14 @@
file_.readFully(bytes);
DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(bytes, bytes.length);
+ bufIn.reset(bytes, bytes.length);
/* Number of keys in the block. */
int keys = bufIn.readInt();
- for ( int i = 0; i < keys; ++i )
- {
- String keyInBlock = bufIn.readUTF();
- if ( keyInBlock.equals(key) )
- {
+ for (int i = 0; i < keys; ++i)
+ {
+ String keyInBlock = bufIn.readUTF();
+ if (keyInBlock.equals(key))
+ {
position = bufIn.readLong();
break;
}
@@ -623,13 +627,13 @@
bufIn.readLong();
bufIn.readLong();
}
- }
-
+ }
+
/* we do this because relative position of the key within a block is stored. */
- if ( position != -1L )
+ if (position != -1L)
position = blockIndexPosition - position;
else
- throw new IOException("This key " + key + " does not exist in this file.");
+ throw new IOException("This key " + key + " does not exist in this file.");
return position;
}
@@ -641,7 +645,7 @@
SSTable.BlockMetadata blockMetadata = SSTable.BlockMetadata.NULL;
/* read the block key. */
String blockIndexKey = file_.readUTF();
- if ( !blockIndexKey.equals(SSTable.blockIndexKey_) )
+ if (!blockIndexKey.equals(SSTable.blockIndexKey_))
throw new IOException("Unexpected position to be reading the block index from.");
/* read the size of the block index */
int size = file_.readInt();
@@ -655,9 +659,9 @@
/* Number of keys in the block. */
int keys = bufIn.readInt();
- for ( int i = 0; i < keys; ++i )
- {
- if ( isKeyInBlock(key, bufIn) )
+ for (int i = 0; i < keys; ++i)
+ {
+ if (isKeyInBlock(key, bufIn))
{
long position = bufIn.readLong();
long dataSize = bufIn.readLong();
@@ -683,25 +687,26 @@
* This function seeks to the position where the key data is present in the file
* in order to get the buffer cache populated with the key-data. This is done as
* a hint before the user actually queries the data.
- * @param key the key whose data is being touched
+ *
+ * @param key the key whose data is being touched
* @param fData
*/
public long touch(String key, boolean fData) throws IOException
{
long bytesRead = -1L;
- if ( isEOF() )
+ if (isEOF())
return bytesRead;
long startPosition = file_.getFilePointer();
String keyInDisk = file_.readUTF();
- if ( keyInDisk != null )
+ if (keyInDisk != null)
{
/*
* If key on disk is greater than requested key
* we can bail out since we exploit the property
* of the SSTable format.
*/
- if ( keyInDisk.compareTo(key) > 0 )
+ if (keyInDisk.compareTo(key) > 0)
return bytesRead;
/*
@@ -710,7 +715,7 @@
* position ourselves to read the next one.
*/
int dataSize = file_.readInt();
- if ( keyInDisk.equals(key) )
+ if (keyInDisk.equals(key))
{
/* return 0L to signal the key has been touched. */
bytesRead = 0L;
@@ -728,12 +733,13 @@
return bytesRead;
}
-
+
/**
* This method seek the disk head to the block index, finds
* the offset of the key within the block and seeks to that
* offset.
- * @param key we are interested in.
+ *
+ * @param key we are interested in.
* @param section indicates the location of the block index.
* @throws IOException
*/
@@ -741,12 +747,13 @@
{
/* Goto the Block Index */
seek(section.end_);
- long position = getPositionFromBlockIndex(key);
- seek(position);
+ long position = getPositionFromBlockIndex(key);
+ seek(position);
}
-
+
/**
* Defreeze the bloom filter.
+ *
* @return bloom filter summarizing the column information
* @throws IOException
*/
@@ -760,10 +767,11 @@
BloomFilter bf = BloomFilter.serializer().deserialize(bufIn);
return bf;
}
-
+
/**
- * Reads the column name indexes if present. If the
+ * Reads the column name indexes if present. If the
* indexes are based on time then skip over them.
+ *
* @param cfName
* @return
*/
@@ -771,13 +779,13 @@
{
/* check if we have an index */
boolean hasColumnIndexes = file_.readBoolean();
- int totalBytesRead = 1;
+ int totalBytesRead = 1;
/* if we do then deserialize the index */
- if(hasColumnIndexes)
- {
- if ( DatabaseDescriptor.isNameSortingEnabled(cfName) || DatabaseDescriptor.getColumnFamilyType(cfName).equals("Super") )
+ if (hasColumnIndexes)
+ {
+ if (DatabaseDescriptor.isNameSortingEnabled(cfName) || DatabaseDescriptor.getColumnFamilyType(cfName).equals("Super"))
{
- /* read the index */
+ /* read the index */
totalBytesRead += IndexHelper.deserializeIndex(cfName, file_, columnIndexList);
}
else
@@ -787,10 +795,11 @@
}
return totalBytesRead;
}
-
+
/**
- * Reads the column name indexes if present. If the
+ * Reads the column name indexes if present. If the
* indexes are based on time then skip over them.
+ *
* @param cfName
* @return
*/
@@ -798,13 +807,13 @@
{
/* check if we have an index */
boolean hasColumnIndexes = file_.readBoolean();
- int totalBytesRead = 1;
+ int totalBytesRead = 1;
/* if we do then deserialize the index */
- if(hasColumnIndexes)
- {
- if ( DatabaseDescriptor.isTimeSortingEnabled(cfName) )
+ if (hasColumnIndexes)
+ {
+ if (DatabaseDescriptor.isTimeSortingEnabled(cfName))
{
- /* read the index */
+ /* read the index */
totalBytesRead += IndexHelper.deserializeIndex(cfName, file_, columnIndexList);
}
else
@@ -814,13 +823,13 @@
}
return totalBytesRead;
}
-
+
/**
- * This is useful in figuring out the key in system. If an OPHF
+ * This is useful in figuring out the key in system. If an OPHF
* is used then the "key" is the application supplied key. If a random
- * partitioning mechanism is used then the key is of the form
+ * partitioning mechanism is used then the key is of the form
* hash:key where hash is used internally as the key.
- *
+ *
* @param in the DataInput stream from which the key needs to be read
* @return the appropriate key based on partitioning type
* @throws IOException
@@ -829,12 +838,12 @@
{
String keyInDisk = null;
PartitionerType pType = StorageService.getPartitionerType();
- switch( pType )
+ switch (pType)
{
case OPHF:
- keyInDisk = in.readUTF();
+ keyInDisk = in.readUTF();
break;
-
+
default:
keyInDisk = in.readUTF().split(":")[0];
break;
@@ -847,33 +856,33 @@
* passed in. Always use this method to query for application
* specific data as it will have indexes.
*
- * @param key key we are interested in.
- * @param bufOut DataOutputStream that needs to be filled.
- * @param cf the IColumn we want to read
+ * @param key key we are interested in.
+ * @param bufOut DataOutputStream that needs to be filled.
+ * @param cf the IColumn we want to read
* @param section region of the file that needs to be read
* @return total number of bytes read/considered
- */
+ */
public long next(String key, DataOutputBuffer bufOut, String cf, Coordinate section) throws IOException
{
- String[] values = RowMutation.getColumnAndColumnFamily(cf);
- String columnFamilyName = values[0];
- String columnName = (values.length == 1) ? null : values[1];
+ String[] values = RowMutation.getColumnAndColumnFamily(cf);
+ String columnFamilyName = values[0];
+ String columnName = (values.length == 1) ? null : values[1];
long bytesRead = -1L;
- if ( isEOF() )
+ if (isEOF())
return bytesRead;
- seekTo(key, section);
+ seekTo(key, section);
/* note the position where the key starts */
long startPosition = file_.getFilePointer();
String keyInDisk = readKeyFromDisk(file_);
- if ( keyInDisk != null )
+ if (keyInDisk != null)
{
/*
* If key on disk is greater than requested key
* we can bail out since we exploit the property
* of the SSTable format.
*/
- if ( keyInDisk.compareTo(key) > 0 )
+ if (keyInDisk.compareTo(key) > 0)
return bytesRead;
/*
@@ -882,24 +891,24 @@
* position ourselves to read the next one.
*/
int dataSize = file_.readInt();
- if ( keyInDisk.equals(key) )
+ if (keyInDisk.equals(key))
{
/* write the key into buffer */
- bufOut.writeUTF( keyInDisk );
-
- if(columnName == null)
+ bufOut.writeUTF(keyInDisk);
+
+ if (columnName == null)
{
- int bytesSkipped = IndexHelper.skipBloomFilterAndIndex(file_);
- /*
- * read the correct number of bytes for the column family and
- * write data into buffer. Substract from dataSize the bloom
- * filter size.
- */
- dataSize -= bytesSkipped;
- /* write the data size */
- bufOut.writeInt(dataSize);
- /* write the data into buffer, except the boolean we have read */
- bufOut.write(file_, dataSize);
+ int bytesSkipped = IndexHelper.skipBloomFilterAndIndex(file_);
+ /*
+ * read the correct number of bytes for the column family and
+ * write data into buffer. Substract from dataSize the bloom
+ * filter size.
+ */
+ dataSize -= bytesSkipped;
+ /* write the data size */
+ bufOut.writeInt(dataSize);
+ /* write the data into buffer, except the boolean we have read */
+ bufOut.write(file_, dataSize);
}
else
{
@@ -907,15 +916,15 @@
long preBfPos = file_.getFilePointer();
BloomFilter bf = defreezeBloomFilter();
/* column does not exist in this file */
- if ( !bf.isPresent(columnName) )
+ if (!bf.isPresent(columnName))
return bytesRead;
long postBfPos = file_.getFilePointer();
dataSize -= (postBfPos - preBfPos);
-
+
List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
/* Read the name indexes if present */
- int totalBytesRead = handleColumnNameIndexes(columnFamilyName, columnIndexList);
- dataSize -= totalBytesRead;
+ int totalBytesRead = handleColumnNameIndexes(columnFamilyName, columnIndexList);
+ dataSize -= totalBytesRead;
/* read the column family name */
String cfName = file_.readUTF();
@@ -928,16 +937,16 @@
/* read the total number of columns */
int totalNumCols = file_.readInt();
dataSize -= 4;
-
+
/* get the column range we have to read */
IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnNameIndexInfo(columnName);
IndexHelper.ColumnRange columnRange = IndexHelper.getColumnRangeFromNameIndex(cIndexInfo, columnIndexList, dataSize, totalNumCols);
Coordinate coordinate = columnRange.coordinate();
- /* seek to the correct offset to the data, and calculate the data size */
- file_.skipBytes((int)coordinate.start_);
- dataSize = (int)(coordinate.end_ - coordinate.start_);
-
+ /* seek to the correct offset to the data, and calculate the data size */
+ file_.skipBytes((int) coordinate.start_);
+ dataSize = (int) (coordinate.end_ - coordinate.start_);
+
/*
* write the number of columns in the column family we are returning:
* dataSize that we are reading +
@@ -945,7 +954,7 @@
* one booleanfor deleted or not +
* one int for number of columns
*/
- bufOut.writeInt(dataSize + utfPrefix_+cfName.length() + 4 + 1);
+ bufOut.writeInt(dataSize + utfPrefix_ + cfName.length() + 4 + 1);
/* write the column family name */
bufOut.writeUTF(cfName);
/* write if this cf is marked for delete */
@@ -959,50 +968,50 @@
else
{
/* skip over data portion */
- file_.seek(dataSize + file_.getFilePointer());
+ file_.seek(dataSize + file_.getFilePointer());
}
-
+
long endPosition = file_.getFilePointer();
- bytesRead = endPosition - startPosition;
+ bytesRead = endPosition - startPosition;
}
return bytesRead;
}
-
+
/**
* This method dumps the next key/value into the DataOuputStream
* passed in. Always use this method to query for application
* specific data as it will have indexes.
-
- * @param key key we are interested in.
- * @param bufOut DataOutputStream that needs to be filled.
- * @param cf name of the column in our format.
+ *
+ * @param key key we are interested in.
+ * @param bufOut DataOutputStream that needs to be filled.
+ * @param cf name of the column in our format.
* @param timeRange time range we are interested in.
- * @param section region of the file that needs to be read
- * @throws IOException
+ * @param section region of the file that needs to be read
* @return number of bytes that were read.
- */
+ * @throws IOException
+ */
public long next(String key, DataOutputBuffer bufOut, String cf, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException
{
String[] values = RowMutation.getColumnAndColumnFamily(cf);
- String columnFamilyName = values[0];
+ String columnFamilyName = values[0];
String columnName = (values.length == 1) ? null : values[1];
long bytesRead = -1L;
- if ( isEOF() )
- return bytesRead;
- seekTo(key, section);
+ if (isEOF())
+ return bytesRead;
+ seekTo(key, section);
/* note the position where the key starts */
long startPosition = file_.getFilePointer();
String keyInDisk = readKeyFromDisk(file_);
- if ( keyInDisk != null )
+ if (keyInDisk != null)
{
/*
* If key on disk is greater than requested key
* we can bail out since we exploit the property
* of the SSTable format.
*/
- if ( keyInDisk.compareTo(key) > 0 )
+ if (keyInDisk.compareTo(key) > 0)
return bytesRead;
/*
@@ -1011,25 +1020,25 @@
* position ourselves to read the next one.
*/
int dataSize = file_.readInt();
- if ( keyInDisk.equals(key) )
+ if (keyInDisk.equals(key))
{
/* write the key into buffer */
- bufOut.writeUTF( keyInDisk );
-
- if(columnName == null)
+ bufOut.writeUTF(keyInDisk);
+
+ if (columnName == null)
{
int bytesSkipped = IndexHelper.skipBloomFilter(file_);
/*
* read the correct number of bytes for the column family and
* write data into buffer. Substract from dataSize the bloom
* filter size.
- */
+ */
dataSize -= bytesSkipped;
List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
/* Read the times indexes if present */
- int totalBytesRead = handleColumnTimeIndexes(columnFamilyName, columnIndexList);
+ int totalBytesRead = handleColumnTimeIndexes(columnFamilyName, columnIndexList);
dataSize -= totalBytesRead;
-
+
/* read the column family name */
String cfName = file_.readUTF();
dataSize -= (utfPrefix_ + cfName.length());
@@ -1041,15 +1050,15 @@
/* read the total number of columns */
int totalNumCols = file_.readInt();
dataSize -= 4;
-
- /* get the column range we have to read */
+
+ /* get the column range we have to read */
IndexHelper.ColumnRange columnRange = IndexHelper.getColumnRangeFromTimeIndex(timeRange, columnIndexList, dataSize, totalNumCols);
Coordinate coordinate = columnRange.coordinate();
/* seek to the correct offset to the data, and calculate the data size */
- file_.skipBytes((int)coordinate.start_);
- dataSize = (int)(coordinate.end_ - coordinate.start_);
-
+ file_.skipBytes((int) coordinate.start_);
+ dataSize = (int) (coordinate.end_ - coordinate.start_);
+
/*
* write the number of columns in the column family we are returning:
* dataSize that we are reading +
@@ -1057,7 +1066,7 @@
* one booleanfor deleted or not +
* one int for number of columns
*/
- bufOut.writeInt(dataSize + utfPrefix_+cfName.length() + 4 + 1);
+ bufOut.writeInt(dataSize + utfPrefix_ + cfName.length() + 4 + 1);
/* write the column family name */
bufOut.writeUTF(cfName);
/* write if this cf is marked for delete */
@@ -1073,9 +1082,9 @@
/* skip over data portion */
file_.seek(dataSize + file_.getFilePointer());
}
-
+
long endPosition = file_.getFilePointer();
- bytesRead = endPosition - startPosition;
+ bytesRead = endPosition - startPosition;
}
return bytesRead;
@@ -1086,36 +1095,35 @@
* passed in. Always use this method to query for application
* specific data as it will have indexes.
*
- * @param key key we are interested in.
- * @param bufOut DataOutputStream that needs to be filled.
- * @param cf The name of the column family only without the ":"
+ * @param key key we are interested in.
+ * @param bufOut DataOutputStream that needs to be filled.
+ * @param cf The name of the column family only without the ":"
* @param columnNames The list of columns in the cfName column family that we want to return
- * @param section region of the file that needs to be read
+ * @param section region of the file that needs to be read
* @return total number of bytes read/considered
- *
- */
+ */
public long next(String key, DataOutputBuffer bufOut, String cf, List<String> columnNames, Coordinate section) throws IOException
{
- String[] values = RowMutation.getColumnAndColumnFamily(cf);
- String columnFamilyName = values[0];
+ String[] values = RowMutation.getColumnAndColumnFamily(cf);
+ String columnFamilyName = values[0];
List<String> cNames = new ArrayList<String>(columnNames);
long bytesRead = -1L;
- if ( isEOF() )
+ if (isEOF())
return bytesRead;
- seekTo(key, section);
+ seekTo(key, section);
/* note the position where the key starts */
- long startPosition = file_.getFilePointer();
+ long startPosition = file_.getFilePointer();
String keyInDisk = readKeyFromDisk(file_);
- if ( keyInDisk != null )
+ if (keyInDisk != null)
{
/*
* If key on disk is greater than requested key
* we can bail out since we exploit the property
* of the SSTable format.
*/
- if ( keyInDisk.compareTo(key) > 0 )
+ if (keyInDisk.compareTo(key) > 0)
return bytesRead;
/*
@@ -1124,28 +1132,28 @@
* position ourselves to read the next one.
*/
int dataSize = file_.readInt();
- if ( keyInDisk.equals(key) )
+ if (keyInDisk.equals(key))
{
/* write the key into buffer */
- bufOut.writeUTF( keyInDisk );
-
+ bufOut.writeUTF(keyInDisk);
+
/* if we need to read the all the columns do not read the column indexes */
- if(cNames == null || cNames.size() == 0)
+ if (cNames == null || cNames.size() == 0)
{
- int bytesSkipped = IndexHelper.skipBloomFilterAndIndex(file_);
- /*
- * read the correct number of bytes for the column family and
- * write data into buffer
- */
- dataSize -= bytesSkipped;
- /* write the data size */
- bufOut.writeInt(dataSize);
- /* write the data into buffer, except the boolean we have read */
- bufOut.write(file_, dataSize);
+ int bytesSkipped = IndexHelper.skipBloomFilterAndIndex(file_);
+ /*
+ * read the correct number of bytes for the column family and
+ * write data into buffer
+ */
+ dataSize -= bytesSkipped;
+ /* write the data size */
+ bufOut.writeInt(dataSize);
+ /* write the data into buffer, except the boolean we have read */
+ bufOut.write(file_, dataSize);
}
else
{
- /* Read the bloom filter summarizing the columns */
+ /* Read the bloom filter summarizing the columns */
long preBfPos = file_.getFilePointer();
BloomFilter bf = defreezeBloomFilter();
long postBfPos = file_.getFilePointer();
@@ -1158,11 +1166,11 @@
cNames.remove(cName);
}
*/
-
+
List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
/* read the column name indexes if present */
- int totalBytesRead = handleColumnNameIndexes(columnFamilyName, columnIndexList);
- dataSize -= totalBytesRead;
+ int totalBytesRead = handleColumnNameIndexes(columnFamilyName, columnIndexList);
+ dataSize -= totalBytesRead;
/* read the column family name */
String cfName = file_.readUTF();
@@ -1174,8 +1182,8 @@
/* read the total number of columns */
int totalNumCols = file_.readInt();
- dataSize -= 4;
-
+ dataSize -= 4;
+
// TODO: this is name sorted - but eventually this should be sorted by the same criteria as the col index
/* sort the required list of columns */
Collections.sort(cNames);
@@ -1185,11 +1193,11 @@
/* calculate the data size */
int numColsReturned = 0;
int dataSizeReturned = 0;
- for(IndexHelper.ColumnRange columnRange : columnRanges)
+ for (IndexHelper.ColumnRange columnRange : columnRanges)
{
- numColsReturned += columnRange.count();
+ numColsReturned += columnRange.count();
Coordinate coordinate = columnRange.coordinate();
- dataSizeReturned += coordinate.end_ - coordinate.start_;
+ dataSizeReturned += coordinate.end_ - coordinate.start_;
}
/*
@@ -1199,7 +1207,7 @@
* one booleanfor deleted or not +
* one int for number of columns
*/
- bufOut.writeInt(dataSizeReturned + utfPrefix_+cfName.length() + 4 + 1);
+ bufOut.writeInt(dataSizeReturned + utfPrefix_ + cfName.length() + 4 + 1);
/* write the column family name */
bufOut.writeUTF(cfName);
/* write if this cf is marked for delete */
@@ -1208,24 +1216,24 @@
bufOut.writeInt(numColsReturned);
int prevPosition = 0;
/* now write all the columns we are required to write */
- for(IndexHelper.ColumnRange columnRange : columnRanges)
+ for (IndexHelper.ColumnRange columnRange : columnRanges)
{
/* seek to the correct offset to the data */
Coordinate coordinate = columnRange.coordinate();
- file_.skipBytes( (int)(coordinate.start_ - prevPosition) );
- bufOut.write( file_, (int)(coordinate.end_ - coordinate.start_) );
- prevPosition = (int)coordinate.end_;
+ file_.skipBytes((int) (coordinate.start_ - prevPosition));
+ bufOut.write(file_, (int) (coordinate.end_ - coordinate.start_));
+ prevPosition = (int) coordinate.end_;
}
}
}
else
{
/* skip over data portion */
- file_.seek(dataSize + file_.getFilePointer());
+ file_.seek(dataSize + file_.getFilePointer());
}
long endPosition = file_.getFilePointer();
- bytesRead = endPosition - startPosition;
+ bytesRead = endPosition - startPosition;
}
return bytesRead;
@@ -1241,19 +1249,19 @@
public long next(DataOutputBuffer bufOut) throws IOException
{
long bytesRead = -1L;
- if ( isEOF() )
+ if (isEOF())
return bytesRead;
long startPosition = file_.getFilePointer();
- String key = file_.readUTF();
- if ( key != null )
+ String key = file_.readUTF();
+ if (key != null)
{
/* write the key into buffer */
- bufOut.writeUTF( key );
+ bufOut.writeUTF(key);
int dataSize = file_.readInt();
/* write data size into buffer */
bufOut.writeInt(dataSize);
- /* write the data into buffer */
+ /* write the data into buffer */
bufOut.write(file_, dataSize);
long endPosition = file_.getFilePointer();
bytesRead = endPosition - startPosition;
@@ -1266,7 +1274,7 @@
* we return -1 indicating we are at the end of
* the file.
*/
- if ( key.equals(SequenceFile.marker_) )
+ if (key.equals(SequenceFile.marker_))
bytesRead = -1L;
return bytesRead;
}
@@ -1275,29 +1283,29 @@
* This method dumps the next key/value into the DataOuputStream
* passed in.
*
- * @param key - key we are interested in.
- * @param bufOut DataOutputStream that needs to be filled.
+ * @param key - key we are interested in.
+ * @param bufOut DataOutputStream that needs to be filled.
* @param section region of the file that needs to be read
* @return total number of bytes read/considered
*/
public long next(String key, DataOutputBuffer bufOut, Coordinate section) throws IOException
{
long bytesRead = -1L;
- if ( isEOF() )
+ if (isEOF())
return bytesRead;
-
- seekTo(key, section);
+
+ seekTo(key, section);
/* note the position where the key starts */
- long startPosition = file_.getFilePointer();
+ long startPosition = file_.getFilePointer();
String keyInDisk = readKeyFromDisk(file_);
- if ( keyInDisk != null )
+ if (keyInDisk != null)
{
/*
* If key on disk is greater than requested key
* we can bail out since we exploit the property
* of the SSTable format.
*/
- if ( keyInDisk.compareTo(key) > 0 )
+ if (keyInDisk.compareTo(key) > 0)
return bytesRead;
/*
@@ -1306,10 +1314,10 @@
* position ourselves to read the next one.
*/
int dataSize = file_.readInt();
- if ( keyInDisk.equals(key) )
+ if (keyInDisk.equals(key))
{
/* write the key into buffer */
- bufOut.writeUTF( keyInDisk );
+ bufOut.writeUTF(keyInDisk);
/* write data size into buffer */
bufOut.writeInt(dataSize);
/* write the data into buffer */
@@ -1318,7 +1326,7 @@
else
{
/* skip over data portion */
- file_.seek(dataSize + file_.getFilePointer());
+ file_.seek(dataSize + file_.getFilePointer());
}
long endPosition = file_.getFilePointer();
@@ -1328,7 +1336,7 @@
return bytesRead;
}
}
-
+
public static class Reader extends AbstractReader
{
Reader(String filename) throws IOException
@@ -1336,7 +1344,7 @@
super(filename);
init(filename);
}
-
+
protected void init(String filename) throws IOException
{
file_ = new RandomAccessFile(filename, "r");
@@ -1364,20 +1372,21 @@
public boolean isEOF() throws IOException
{
- return ( getCurrentPosition() == getEOF() );
+ return (getCurrentPosition() == getEOF());
}
/**
* Be extremely careful while using this API. This currently
* used to read the commit log header from the commit logs.
* Treat this as an internal API.
+ *
* @param bytes read from the buffer into the this array
- */
+ */
public void readDirect(byte[] bytes) throws IOException
{
file_.readFully(bytes);
}
-
+
public long readLong() throws IOException
{
return file_.readLong();
@@ -1388,9 +1397,9 @@
file_.close();
}
}
-
+
public static class BufferReader extends Reader
- {
+ {
private int size_;
BufferReader(String filename, int size) throws IOException
@@ -1398,15 +1407,15 @@
super(filename);
size_ = size;
}
-
+
protected void init(String filename) throws IOException
{
file_ = new BufferedRandomAccessFile(filename, "r", size_);
}
}
-
+
public static class ChecksumReader extends Reader
- {
+ {
private int size_;
ChecksumReader(String filename, int size) throws IOException
@@ -1414,14 +1423,14 @@
super(filename);
size_ = size;
}
-
+
protected void init(String filename) throws IOException
{
file_ = new ChecksumRandomAccessFile(filename, "r", size_);
}
}
-
- private static Logger logger_ = Logger.getLogger( SequenceFile.class ) ;
+
+ private static Logger logger_ = Logger.getLogger(SequenceFile.class);
public static final short utfPrefix_ = 2;
public static final String marker_ = "Bloom-Filter";
@@ -1455,9 +1464,10 @@
* Assuming all Strings that are passed in have length
* that can be represented as a short i.e length of the
* string is <= 65535
+ *
* @param buffer buffer to write the serialize version into
- * @param str string to serialize
- */
+ * @param str string to serialize
+ */
protected static void writeUTF(ByteBuffer buffer, String str)
{
int strlen = str.length();