You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/03/27 03:44:50 UTC
svn commit: r759000 - in /incubator/cassandra/trunk/src/org/apache/cassandra:
db/ColumnFamilyStore.java db/FileStruct.java db/FileStructComparator.java
io/Coordinate.java io/SSTable.java
Author: jbellis
Date: Fri Mar 27 02:44:49 2009
New Revision: 759000
URL: http://svn.apache.org/viewvc?rev=759000&view=rev
Log:
clean up FileStruct and make it iterable. (this will be used by range queries.)
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java
incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java
incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=759000&r1=758999&r2=759000&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Fri Mar 27 02:44:49 2009
@@ -675,13 +675,9 @@
{
try
{
- fs = new FileStruct();
- fs.bufIn_ = new DataInputBuffer();
- fs.bufOut_ = new DataOutputBuffer();
- fs.reader_ = SequenceFile.bufferedReader(file, bufferSize);
- fs.key_ = null;
- fs = getNextKey(fs);
- if(fs == null)
+ fs = new FileStruct(SequenceFile.bufferedReader(file, bufferSize));
+ fs.getNextKey();
+ if(fs.isExhausted())
continue;
pq.add(fs);
}
@@ -690,9 +686,9 @@
ex.printStackTrace();
try
{
- if(fs != null)
+ if (fs != null)
{
- fs.reader_.close();
+ fs.close();
}
}
catch(Exception e)
@@ -897,38 +893,6 @@
}
- /*
- * Read the next key from the data file , this fn will skip teh block index
- * and read teh next available key into the filestruct that is passed.
- * If it cannot read or a end of file is reached it will return null.
- */
- FileStruct getNextKey(FileStruct filestruct) throws IOException
- {
- filestruct.bufOut_.reset();
- if (filestruct.reader_.isEOF())
- {
- filestruct.reader_.close();
- return null;
- }
-
- long bytesread = filestruct.reader_.next(filestruct.bufOut_);
- if (bytesread == -1)
- {
- filestruct.reader_.close();
- return null;
- }
-
- filestruct.bufIn_.reset(filestruct.bufOut_.getData(), filestruct.bufOut_.getLength());
- filestruct.key_ = filestruct.bufIn_.readUTF();
- /* If the key we read is the Block Index Key then we are done reading the keys so exit */
- if ( filestruct.key_.equals(SSTable.blockIndexKey_) )
- {
- filestruct.reader_.close();
- return null;
- }
- return filestruct;
- }
-
void forceCleanup()
{
MinorCompactionManager.instance().submitCleanup(ColumnFamilyStore.this);
@@ -1057,11 +1021,11 @@
fs = pq.poll();
}
if (fs != null
- && (lastkey == null || lastkey.compareTo(fs.key_) == 0))
+ && (lastkey == null || lastkey.compareTo(fs.getKey()) == 0))
{
// The keys are the same so we need to add this to the
// ldfs list
- lastkey = fs.key_;
+ lastkey = fs.getKey();
lfs.add(fs);
}
else
@@ -1076,9 +1040,9 @@
try
{
/* read the length although we don't need it */
- filestruct.bufIn_.readInt();
+ filestruct.getBufIn().readInt();
// Skip the Index
- IndexHelper.skipBloomFilterAndIndex(filestruct.bufIn_);
+ IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
// We want to add only 2 and resolve them right there in order to save on memory footprint
if(columnFamilies.size() > 1)
{
@@ -1086,7 +1050,7 @@
merge(columnFamilies);
}
// deserialize into column families
- columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.bufIn_));
+ columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
}
catch ( Exception ex)
{
@@ -1108,13 +1072,13 @@
try
{
/* read the length although we don't need it */
- int size = filestruct.bufIn_.readInt();
- bufOut.write(filestruct.bufIn_, size);
+ int size = filestruct.getBufIn().readInt();
+ bufOut.write(filestruct.getBufIn(), size);
}
catch ( Exception ex)
{
logger_.warn(LogUtil.throwableToString(ex));
- filestruct.reader_.close();
+ filestruct.close();
continue;
}
}
@@ -1142,16 +1106,16 @@
{
try
{
- filestruct = getNextKey ( filestruct );
- if(filestruct == null)
+ filestruct.getNextKey();
+ if (filestruct.isExhausted())
{
continue;
}
/* keep on looping until we find a key in the range */
- while ( !Range.isKeyInRanges(filestruct.key_, ranges) )
+ while ( !Range.isKeyInRanges(filestruct.getKey(), ranges) )
{
- filestruct = getNextKey ( filestruct );
- if(filestruct == null)
+ filestruct.getNextKey();
+ if (filestruct.isExhausted())
{
break;
}
@@ -1163,7 +1127,7 @@
//break;
//}
}
- if ( filestruct != null)
+ if (!filestruct.isExhausted())
{
pq.add(filestruct);
}
@@ -1175,7 +1139,7 @@
// in any case we have read as far as possible from it
// and it will be deleted after compaction.
logger_.warn(LogUtil.throwableToString(ex));
- filestruct.reader_.close();
+ filestruct.close();
}
}
lfs.clear();
@@ -1270,11 +1234,11 @@
fs = pq.poll();
}
if (fs != null
- && (lastkey == null || lastkey.compareTo(fs.key_) == 0))
+ && (lastkey == null || lastkey.compareTo(fs.getKey()) == 0))
{
// The keys are the same so we need to add this to the
// ldfs list
- lastkey = fs.key_;
+ lastkey = fs.getKey();
lfs.add(fs);
}
else
@@ -1289,16 +1253,16 @@
try
{
/* read the length although we don't need it */
- filestruct.bufIn_.readInt();
+ filestruct.getBufIn().readInt();
// Skip the Index
- IndexHelper.skipBloomFilterAndIndex(filestruct.bufIn_);
+ IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
// We want to add only 2 and resolve them right there in order to save on memory footprint
if(columnFamilies.size() > 1)
{
merge(columnFamilies);
}
// deserialize into column families
- columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.bufIn_));
+ columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
}
catch ( Exception ex)
{
@@ -1320,13 +1284,13 @@
try
{
/* read the length although we don't need it */
- int size = filestruct.bufIn_.readInt();
- bufOut.write(filestruct.bufIn_, size);
+ int size = filestruct.getBufIn().readInt();
+ bufOut.write(filestruct.getBufIn(), size);
}
catch ( Exception ex)
{
ex.printStackTrace();
- filestruct.reader_.close();
+ filestruct.close();
continue;
}
}
@@ -1344,8 +1308,8 @@
{
try
{
- filestruct = getNextKey(filestruct);
- if(filestruct == null)
+ filestruct.getNextKey();
+ if (filestruct.isExhausted())
{
continue;
}
@@ -1357,7 +1321,7 @@
// Ignore the exception as it might be a corrupted file
// in any case we have read as far as possible from it
// and it will be deleted after compaction.
- filestruct.reader_.close();
+ filestruct.close();
}
}
lfs.clear();
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java?rev=759000&r1=758999&r2=759000&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java Fri Mar 27 02:44:49 2009
@@ -19,92 +19,189 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.util.Iterator;
+import org.apache.cassandra.io.Coordinate;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.IFileReader;
import org.apache.cassandra.io.SSTable;
-import org.apache.cassandra.io.SequenceFile;
-import org.apache.cassandra.service.StorageService;
-public class FileStruct implements Comparable<FileStruct>
+public class FileStruct implements Comparable<FileStruct>, Iterable<String>
{
- IFileReader reader_;
- String key_;
- DataInputBuffer bufIn_;
- DataOutputBuffer bufOut_;
- public FileStruct()
+ private String key = null;
+ private boolean exhausted = false;
+ private IFileReader reader;
+ private DataInputBuffer bufIn;
+ private DataOutputBuffer bufOut;
+
+ public FileStruct(IFileReader reader)
{
+ this.reader = reader;
+ bufIn = new DataInputBuffer();
+ bufOut = new DataOutputBuffer();
}
-
- public FileStruct(String file, int bufSize) throws IOException
+
+ public String getFileName()
{
- bufIn_ = new DataInputBuffer();
- bufOut_ = new DataOutputBuffer();
- reader_ = SequenceFile.bufferedReader(file, bufSize);
- long bytesRead = advance();
- if ( bytesRead == -1L )
- throw new IOException("Either the file is empty or EOF has been reached.");
+ return reader.getFileName();
}
-
+
+ public void close() throws IOException
+ {
+ reader.close();
+ }
+
+ public boolean isExhausted()
+ {
+ return exhausted;
+ }
+
+ public DataInputBuffer getBufIn()
+ {
+ return bufIn;
+ }
+
public String getKey()
{
- return key_;
+ return key;
}
-
- public DataOutputBuffer getBuffer()
+
+ public int compareTo(FileStruct f)
{
- return bufOut_;
+ return key.compareTo(f.key);
}
-
- public long advance() throws IOException
- {
- long bytesRead = -1L;
- bufOut_.reset();
- /* advance and read the next key in the file. */
- if (reader_.isEOF())
+
+ // we don't use SequenceReader.seekTo, since that (sometimes) throws an exception
+ // if the key is not found. unsure if this behavior is desired.
+ public void seekTo(String seekKey)
+ {
+ try
+ {
+ Coordinate range = SSTable.getCoordinates(seekKey, reader);
+ reader.seek(range.end_);
+ long position = reader.getPositionFromBlockIndex(seekKey);
+ if (position == -1)
+ {
+ reader.seek(range.start_);
+ }
+ else
+ {
+ reader.seek(position);
+ }
+
+ while (!exhausted)
+ {
+ getNextKey();
+ if (key.compareTo(seekKey) >= 0)
+ {
+ break;
+ }
+ }
+ }
+ catch (IOException e)
{
- reader_.close();
- return bytesRead;
+ throw new RuntimeException("corrupt sstable", e);
}
-
- bytesRead = reader_.next(bufOut_);
- if (bytesRead == -1)
+ }
+
+ /*
+ * Read the next key from the data file, skipping block indexes.
+ * Caller must check isExhausted after each call to see if further
+ * reads are valid.
+ */
+ public void getNextKey()
+ {
+ if (exhausted)
{
- reader_.close();
- return bytesRead;
+ throw new IndexOutOfBoundsException();
}
- bufIn_.reset(bufOut_.getData(), bufOut_.getLength());
- key_ = bufIn_.readUTF();
- /* If the key we read is the Block Index Key then omit and read the next key. */
- if ( key_.equals(SSTable.blockIndexKey_) )
+ try
{
- bufOut_.reset();
- bytesRead = reader_.next(bufOut_);
- if (bytesRead == -1)
+ bufOut.reset();
+ if (reader.isEOF())
+ {
+ reader.close();
+ exhausted = true;
+ return;
+ }
+
+ long bytesread = reader.next(bufOut);
+ if (bytesread == -1)
{
- reader_.close();
- return bytesRead;
+ reader.close();
+ exhausted = true;
+ return;
}
- bufIn_.reset(bufOut_.getData(), bufOut_.getLength());
- key_ = bufIn_.readUTF();
+
+ bufIn.reset(bufOut.getData(), bufOut.getLength());
+ key = bufIn.readUTF();
+ /* If the key we read is the Block Index Key then omit and read the next key. */
+ if (key.equals(SSTable.blockIndexKey_))
+ {
+ bufOut.reset();
+ bytesread = reader.next(bufOut);
+ if (bytesread == -1)
+ {
+ reader.close();
+ exhausted = true;
+ return;
+ }
+ bufIn.reset(bufOut.getData(), bufOut.getLength());
+ key = bufIn.readUTF();
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
}
-
- return bytesRead;
}
- public int compareTo(FileStruct f)
+ public Iterator<String> iterator()
{
- return StorageService.getPartitioner().getDecoratedKeyComparator().compare(key_, f.key_);
+ return new FileStructIterator();
}
-
- public void close() throws IOException
+
+ private class FileStructIterator implements Iterator<String>
{
- bufIn_.close();
- bufOut_.close();
- reader_.close();
+ String saved;
+
+ public FileStructIterator()
+ {
+ if (getKey() == null && !isExhausted())
+ {
+ forward();
+ }
+ }
+
+ private void forward()
+ {
+ getNextKey();
+ saved = isExhausted() ? null : getKey();
+ }
+
+ public boolean hasNext()
+ {
+ return saved != null;
+ }
+
+ public String next()
+ {
+ if (saved == null)
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ String key = saved;
+ forward();
+ return key;
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
}
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java?rev=759000&r1=758999&r2=759000&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java Fri Mar 27 02:44:49 2009
@@ -6,13 +6,6 @@
{
public int compare(FileStruct f, FileStruct f2)
{
- return f.reader_.getFileName().compareTo(f2.reader_.getFileName());
- }
-
- public boolean equals(Object o)
- {
- if (!(o instanceof FileStructComparator))
- return false;
- return true;
+ return f.getFileName().compareTo(f2.getFileName());
}
}
\ No newline at end of file
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java?rev=759000&r1=758999&r2=759000&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/Coordinate.java Fri Mar 27 02:44:49 2009
@@ -21,10 +21,10 @@
* Section of a file that needs to be scanned
* is represented by this class.
*/
-class Coordinate
+public class Coordinate
{
- long start_;
- long end_;
+ public final long start_;
+ public final long end_;
Coordinate(long start, long end)
{
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java?rev=759000&r1=758999&r2=759000&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java Fri Mar 27 02:44:49 2009
@@ -653,9 +653,9 @@
afterAppend(decoratedKey, currentPosition, value.length );
}
- private Coordinate getCoordinates(String decoratedKey, IFileReader dataReader) throws IOException
+ public static Coordinate getCoordinates(String decoratedKey, IFileReader dataReader) throws IOException
{
- List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataFile_);
+ List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataReader.getFileName());
int size = (indexInfo == null) ? 0 : indexInfo.size();
long start = 0L;
long end = dataReader.getEOF();