You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/07 03:59:35 UTC
svn commit: r896743 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ db/filter/
io/ io/util/
Author: jbellis
Date: Thu Jan 7 02:59:28 2010
New Revision: 896743
URL: http://svn.apache.org/viewvc?rev=896743&view=rev
Log:
instead of providing a RandomAccessFile-like interface in FileDataInput implementing seek and trying to keep people from shooting themselves in the foot by forgetting that it may only represent a 2GB segment of a larger file, provide an InputStream-like interface emphasizing mark/reset
patch by jbellis; tested by Brandon Williams for CASSANDRA-669
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Thu Jan 7 02:59:28 2010
@@ -297,7 +297,7 @@
logger_.debug("Replaying " + file + " starting at " + lowPos);
/* read the logs populate RowMutation and apply */
- while (reader.getFilePointer() < reader.length())
+ while (!reader.isEOF())
{
if (logger_.isDebugEnabled())
logger_.debug("Reading mutation at " + reader.getFilePointer());
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java Thu Jan 7 02:59:28 2010
@@ -90,14 +90,13 @@
ranges.add(indexInfo);
}
- /* seek to the correct offset to the data */
- long columnBegin = file.getFilePointer();
- /* now read all the columns from the ranges */
+ file.mark();
for (IndexHelper.IndexInfo indexInfo : ranges)
{
- file.seek(columnBegin + indexInfo.offset);
+ file.reset();
+ assert file.skipBytes((int)indexInfo.offset) == indexInfo.offset;
// TODO only completely deserialize columns we are interested in
- while (file.getFilePointer() < columnBegin + indexInfo.offset + indexInfo.width)
+ while (file.bytesPastMark() < indexInfo.offset + indexInfo.width)
{
final IColumn column = cf.getColumnSerializer().deserialize(file);
// we check vs the original Set, not the filtered List, for efficiency
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java Thu Jan 7 02:59:28 2010
@@ -113,7 +113,6 @@
private final ColumnFamily emptyColumnFamily;
private final List<IndexHelper.IndexInfo> indexes;
- private final long columnStartPosition;
private final FileDataInput file;
private int curRangeIndex;
@@ -133,7 +132,7 @@
emptyColumnFamily = ColumnFamily.serializer().deserializeFromSSTableNoColumns(ssTable.makeColumnFamily(), file);
file.readInt(); // column count
- columnStartPosition = file.getFilePointer();
+ file.mark();
curRangeIndex = IndexHelper.indexFor(startColumn, indexes, comparator, reversed);
if (reversed && curRangeIndex == indexes.size())
curRangeIndex--;
@@ -186,8 +185,9 @@
boolean outOfBounds = false;
- file.seek(columnStartPosition + curColPosition.offset);
- while (file.getFilePointer() < columnStartPosition + curColPosition.offset + curColPosition.width && !outOfBounds)
+ file.reset();
+ assert file.skipBytes((int)curColPosition.offset) == curColPosition.offset;
+ while (file.bytesPastMark() < curColPosition.offset + curColPosition.width && !outOfBounds)
{
IColumn column = emptyColumnFamily.getColumnSerializer().deserialize(file);
if (reversed)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java Thu Jan 7 02:59:28 2010
@@ -72,12 +72,12 @@
ArrayList<IndexInfo> indexList = new ArrayList<IndexInfo>();
int columnIndexSize = in.readInt();
- long start = in.getFilePointer();
- while (in.getFilePointer() < start + columnIndexSize)
+ in.mark();
+ while (in.bytesPastMark() < columnIndexSize)
{
indexList.add(IndexInfo.deserialize(in));
}
- assert in.getFilePointer() == start + columnIndexSize;
+ assert in.bytesPastMark() == columnIndexSize;
return indexList;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java Thu Jan 7 02:59:28 2010
@@ -27,6 +27,7 @@
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.io.util.FileDataInput;
import com.google.common.collect.AbstractIterator;
@@ -35,18 +36,16 @@
{
private final DecoratedKey key;
private final long finishedAt;
- private final FileDataInput file;
+ private final BufferedRandomAccessFile file;
private SSTableReader sstable;
private long dataStart;
- private final IPartitioner partitioner;
- public IteratingRow(FileDataInput file, SSTableReader sstable) throws IOException
+ public IteratingRow(BufferedRandomAccessFile file, SSTableReader sstable) throws IOException
{
this.file = file;
this.sstable = sstable;
- this.partitioner = StorageService.getPartitioner();
- key = partitioner.convertFromDiskFormat(file.readUTF());
+ key = StorageService.getPartitioner().convertFromDiskFormat(file.readUTF());
int dataSize = file.readInt();
dataStart = file.getFilePointer();
finishedAt = dataStart + dataSize;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Thu Jan 7 02:59:28 2010
@@ -395,8 +395,8 @@
FileDataInput input;
if (indexBuffers == null)
{
- input = new BufferedRandomAccessFile(indexFilename(), "r");
- input.seek(p);
+ input = new BufferedRandomAccessFile(path, "r");
+ ((BufferedRandomAccessFile)input).seek(p);
}
else
{
@@ -421,7 +421,7 @@
if (v == 0)
{
PositionSize info;
- if (input.getFilePointer() < input.length())
+ if (!input.isEOF())
{
int utflen = input.readUnsignedShort();
if (utflen != input.skipBytes(utflen))
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java Thu Jan 7 02:59:28 2010
@@ -35,10 +35,10 @@
{
private static Logger logger = Logger.getLogger(SSTableScanner.class);
+ private final BufferedRandomAccessFile file;
+ private final SSTableReader sstable;
private IteratingRow row;
private boolean exhausted = false;
- private FileDataInput file;
- private SSTableReader sstable;
private Iterator<IteratingRow> iterator;
/**
@@ -116,7 +116,9 @@
{
try
{
- return (row == null && file.getFilePointer() < file.length()) || row.getEndPosition() < file.length();
+ if (row == null)
+ return !file.isEOF();
+ return row.getEndPosition() < file.length();
}
catch (IOException e)
{
@@ -130,7 +132,7 @@
{
if (row != null)
row.skipRemaining();
- assert file.getFilePointer() < file.length();
+ assert !file.isEOF();
return row = new IteratingRow(file, sstable);
}
catch (IOException e)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java Thu Jan 7 02:59:28 2010
@@ -56,6 +56,7 @@
private long maxHi_; // this.lo + this.buff.length
private boolean hitEOF_; // buffer contains last file block?
private long diskPos_; // disk position
+ private long markedPointer;
/*
* To describe the above fields, we introduce the following abstractions for
@@ -391,4 +392,28 @@
this.curr_ += len;
return len;
}
+
+ public boolean isEOF() throws IOException
+ {
+ return getFilePointer() == length();
+ }
+
+ public void mark()
+ {
+ markedPointer = getFilePointer();
+ }
+
+ public void reset() throws IOException
+ {
+ seek(markedPointer);
+ }
+
+ public int bytesPastMark()
+ {
+ long bytes = getFilePointer() - markedPointer;
+ assert bytes >= 0;
+ if (bytes > Integer.MAX_VALUE)
+ throw new UnsupportedOperationException("Overflow: " + bytes);
+ return (int) bytes;
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java Thu Jan 7 02:59:28 2010
@@ -6,11 +6,13 @@
public interface FileDataInput extends DataInput, Closeable
{
- public void seek(long pos) throws IOException;
+ public String getPath();
- public long length() throws IOException;
+ public boolean isEOF() throws IOException;
- public long getFilePointer();
+ public void mark();
- public String getPath();
+ public void reset() throws IOException;
+
+ public int bytesPastMark();
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java Thu Jan 7 02:59:28 2010
@@ -8,7 +8,7 @@
private final MappedByteBuffer buffer;
private final String filename;
private int position;
- private long fileLength;
+ private int markedPosition;
public MappedFileDataInput(MappedByteBuffer buffer, String filename)
{
@@ -21,25 +21,46 @@
this.buffer = buffer;
this.filename = filename;
this.position = position;
- assert (fileLength = new File(filename).length()) >= 0; // hack to only initialize fL when assertions are enabled
}
- public void seek(long pos) throws IOException
+ // don't make this public, this is only for seeking WITHIN the current mapped segment
+ private void seekInternal(int pos) throws IOException
{
- assert pos <= Integer.MAX_VALUE;
- assert buffer.capacity() == fileLength; // calling this does not make sense on a mapped chunk of a larger file
- position = (int) pos;
+ position = pos;
}
- public long length() throws IOException
+ @Override
+ public boolean markSupported()
{
- assert buffer.capacity() == fileLength; // calling this does not make sense on a mapped chunk of a larger file
- return buffer.capacity();
+ return true;
}
- public long getFilePointer()
+ @Override
+ public void mark(int ignored)
{
- return position;
+ markedPosition = position;
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ seekInternal(markedPosition);
+ }
+
+ public void mark()
+ {
+ mark(-1);
+ }
+
+ public int bytesPastMark()
+ {
+ assert position >= markedPosition;
+ return position - markedPosition;
+ }
+
+ public boolean isEOF() throws IOException
+ {
+ return position == buffer.capacity();
}
public String getPath()
@@ -49,7 +70,7 @@
public int read() throws IOException
{
- if (position == length())
+ if (isEOF())
return -1;
return buffer.get(position++) & 0xFF;
}
@@ -252,7 +273,7 @@
public final String readLine() throws IOException {
StringBuilder line = new StringBuilder(80); // Typical line length
boolean foundTerminator = false;
- long unreadPosition = 0;
+ int unreadPosition = 0;
while (true) {
int nextByte = read();
switch (nextByte) {
@@ -260,18 +281,18 @@
return line.length() != 0 ? line.toString() : null;
case (byte) '\r':
if (foundTerminator) {
- seek(unreadPosition);
+ seekInternal(unreadPosition);
return line.toString();
}
foundTerminator = true;
/* Have to be able to peek ahead one byte */
- unreadPosition = getFilePointer();
+ unreadPosition = position;
break;
case (byte) '\n':
return line.toString();
default:
if (foundTerminator) {
- seek(unreadPosition);
+ seekInternal(unreadPosition);
return line.toString();
}
line.append((char) nextByte);