You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/07 03:59:35 UTC

svn commit: r896743 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ db/filter/ io/ io/util/

Author: jbellis
Date: Thu Jan  7 02:59:28 2010
New Revision: 896743

URL: http://svn.apache.org/viewvc?rev=896743&view=rev
Log:
instead of providing a RandomAccessFile-like interface in FileDataInput implementing seek and trying to keep people from shooting themselves in the foot by forgetting that it may only represent a 2GB segment of a larger file, provide an InputStream-like interface emphasizing mark/reset
patch by jbellis; tested by Brandon Williams for CASSANDRA-669

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Thu Jan  7 02:59:28 2010
@@ -297,7 +297,7 @@
                 logger_.debug("Replaying " + file + " starting at " + lowPos);
 
             /* read the logs populate RowMutation and apply */
-            while (reader.getFilePointer() < reader.length())
+            while (!reader.isEOF())
             {
                 if (logger_.isDebugEnabled())
                     logger_.debug("Reading mutation at " + reader.getFilePointer());

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java Thu Jan  7 02:59:28 2010
@@ -90,14 +90,13 @@
                 ranges.add(indexInfo);
             }
 
-            /* seek to the correct offset to the data */
-            long columnBegin = file.getFilePointer();
-            /* now read all the columns from the ranges */
+            file.mark();
             for (IndexHelper.IndexInfo indexInfo : ranges)
             {
-                file.seek(columnBegin + indexInfo.offset);
+                file.reset();
+                assert file.skipBytes((int)indexInfo.offset) == indexInfo.offset;
                 // TODO only completely deserialize columns we are interested in
-                while (file.getFilePointer() < columnBegin + indexInfo.offset + indexInfo.width)
+                while (file.bytesPastMark() < indexInfo.offset + indexInfo.width)
                 {
                     final IColumn column = cf.getColumnSerializer().deserialize(file);
                     // we check vs the original Set, not the filtered List, for efficiency

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java Thu Jan  7 02:59:28 2010
@@ -113,7 +113,6 @@
         private final ColumnFamily emptyColumnFamily;
 
         private final List<IndexHelper.IndexInfo> indexes;
-        private final long columnStartPosition;
         private final FileDataInput file;
 
         private int curRangeIndex;
@@ -133,7 +132,7 @@
             emptyColumnFamily = ColumnFamily.serializer().deserializeFromSSTableNoColumns(ssTable.makeColumnFamily(), file);
             file.readInt(); // column count
 
-            columnStartPosition = file.getFilePointer();
+            file.mark();
             curRangeIndex = IndexHelper.indexFor(startColumn, indexes, comparator, reversed);
             if (reversed && curRangeIndex == indexes.size())
                 curRangeIndex--;
@@ -186,8 +185,9 @@
 
             boolean outOfBounds = false;
 
-            file.seek(columnStartPosition + curColPosition.offset);
-            while (file.getFilePointer() < columnStartPosition + curColPosition.offset + curColPosition.width && !outOfBounds)
+            file.reset();
+            assert file.skipBytes((int)curColPosition.offset) == curColPosition.offset;
+            while (file.bytesPastMark() < curColPosition.offset + curColPosition.width && !outOfBounds)
             {
                 IColumn column = emptyColumnFamily.getColumnSerializer().deserialize(file);
                 if (reversed)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IndexHelper.java Thu Jan  7 02:59:28 2010
@@ -72,12 +72,12 @@
         ArrayList<IndexInfo> indexList = new ArrayList<IndexInfo>();
 
 		int columnIndexSize = in.readInt();
-        long start = in.getFilePointer();
-        while (in.getFilePointer() < start + columnIndexSize)
+        in.mark();
+        while (in.bytesPastMark() < columnIndexSize)
         {
             indexList.add(IndexInfo.deserialize(in));
         }
-        assert in.getFilePointer() == start + columnIndexSize;
+        assert in.bytesPastMark() == columnIndexSize;
 
         return indexList;
 	}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/IteratingRow.java Thu Jan  7 02:59:28 2010
@@ -27,6 +27,7 @@
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.io.util.FileDataInput;
 import com.google.common.collect.AbstractIterator;
@@ -35,18 +36,16 @@
 {
     private final DecoratedKey key;
     private final long finishedAt;
-    private final FileDataInput file;
+    private final BufferedRandomAccessFile file;
     private SSTableReader sstable;
     private long dataStart;
-    private final IPartitioner partitioner;
 
-    public IteratingRow(FileDataInput file, SSTableReader sstable) throws IOException
+    public IteratingRow(BufferedRandomAccessFile file, SSTableReader sstable) throws IOException
     {
         this.file = file;
         this.sstable = sstable;
-        this.partitioner = StorageService.getPartitioner();
 
-        key = partitioner.convertFromDiskFormat(file.readUTF());
+        key = StorageService.getPartitioner().convertFromDiskFormat(file.readUTF());
         int dataSize = file.readInt();
         dataStart = file.getFilePointer();
         finishedAt = dataStart + dataSize;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Thu Jan  7 02:59:28 2010
@@ -395,8 +395,8 @@
         FileDataInput input;
         if (indexBuffers == null)
         {
-            input = new BufferedRandomAccessFile(indexFilename(), "r");
-            input.seek(p);
+            input = new BufferedRandomAccessFile(path, "r");
+            ((BufferedRandomAccessFile)input).seek(p);
         }
         else
         {
@@ -421,7 +421,7 @@
                 if (v == 0)
                 {
                     PositionSize info;
-                    if (input.getFilePointer() < input.length())
+                    if (!input.isEOF())
                     {
                         int utflen = input.readUnsignedShort();
                         if (utflen != input.skipBytes(utflen))

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableScanner.java Thu Jan  7 02:59:28 2010
@@ -35,10 +35,10 @@
 {
     private static Logger logger = Logger.getLogger(SSTableScanner.class);
 
+    private final BufferedRandomAccessFile file;
+    private final SSTableReader sstable;
     private IteratingRow row;
     private boolean exhausted = false;
-    private FileDataInput file;
-    private SSTableReader sstable;
     private Iterator<IteratingRow> iterator;
 
     /**
@@ -116,7 +116,9 @@
         {
             try
             {
-                return (row == null && file.getFilePointer() < file.length()) || row.getEndPosition() < file.length();
+                if (row == null)
+                    return !file.isEOF();
+                return row.getEndPosition() < file.length();
             }
             catch (IOException e)
             {
@@ -130,7 +132,7 @@
             {
                 if (row != null)
                     row.skipRemaining();
-                assert file.getFilePointer() < file.length();
+                assert !file.isEOF();
                 return row = new IteratingRow(file, sstable);
             }
             catch (IOException e)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java Thu Jan  7 02:59:28 2010
@@ -56,6 +56,7 @@
     private long maxHi_; // this.lo + this.buff.length
     private boolean hitEOF_; // buffer contains last file block?
     private long diskPos_; // disk position
+    private long markedPointer;
 
     /*
     * To describe the above fields, we introduce the following abstractions for
@@ -391,4 +392,28 @@
         this.curr_ += len;
         return len;
     }
+
+    public boolean isEOF() throws IOException
+    {
+        return getFilePointer() == length();
+    }
+
+    public void mark()
+    {
+        markedPointer = getFilePointer();
+    }
+
+    public void reset() throws IOException
+    {
+        seek(markedPointer);
+    }
+
+    public int bytesPastMark()
+    {
+        long bytes = getFilePointer() - markedPointer;
+        assert bytes >= 0;
+        if (bytes > Integer.MAX_VALUE)
+            throw new UnsupportedOperationException("Overflow: " + bytes);
+        return (int) bytes;
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java Thu Jan  7 02:59:28 2010
@@ -6,11 +6,13 @@
 
 public interface FileDataInput extends DataInput, Closeable
 {
-    public void seek(long pos) throws IOException;
+    public String getPath();
 
-    public long length() throws IOException;
+    public boolean isEOF() throws IOException;
 
-    public long getFilePointer();
+    public void mark();
 
-    public String getPath();
+    public void reset() throws IOException;
+
+    public int bytesPastMark();
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java?rev=896743&r1=896742&r2=896743&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java Thu Jan  7 02:59:28 2010
@@ -8,7 +8,7 @@
     private final MappedByteBuffer buffer;
     private final String filename;
     private int position;
-    private long fileLength;
+    private int markedPosition;
 
     public MappedFileDataInput(MappedByteBuffer buffer, String filename)
     {
@@ -21,25 +21,46 @@
         this.buffer = buffer;
         this.filename = filename;
         this.position = position;
-        assert (fileLength = new File(filename).length()) >= 0; // hack to only initialize fL when assertions are enabled
     }
 
-    public void seek(long pos) throws IOException
+    // don't make this public, this is only for seeking WITHIN the current mapped segment
+    private void seekInternal(int pos) throws IOException
     {
-        assert pos <= Integer.MAX_VALUE;
-        assert buffer.capacity() == fileLength; // calling this does not make sense on a mapped chunk of a larger file
-        position = (int) pos;
+        position = pos;
     }
 
-    public long length() throws IOException
+    @Override
+    public boolean markSupported()
     {
-        assert buffer.capacity() == fileLength; // calling this does not make sense on a mapped chunk of a larger file
-        return buffer.capacity();
+        return true;
     }
 
-    public long getFilePointer()
+    @Override
+    public void mark(int ignored)
     {
-        return position;
+        markedPosition = position;
+    }
+
+    @Override
+    public void reset() throws IOException
+    {
+        seekInternal(markedPosition);
+    }
+
+    public void mark()
+    {
+        mark(-1);
+    }
+
+    public int bytesPastMark()
+    {
+        assert position >= markedPosition;
+        return position - markedPosition;
+    }
+
+    public boolean isEOF() throws IOException
+    {
+        return position == buffer.capacity();
     }
 
     public String getPath()
@@ -49,7 +70,7 @@
 
     public int read() throws IOException
     {
-        if (position == length())
+        if (isEOF())
             return -1;
         return buffer.get(position++) & 0xFF;
     }
@@ -252,7 +273,7 @@
     public final String readLine() throws IOException {
         StringBuilder line = new StringBuilder(80); // Typical line length
         boolean foundTerminator = false;
-        long unreadPosition = 0;
+        int unreadPosition = 0;
         while (true) {
             int nextByte = read();
             switch (nextByte) {
@@ -260,18 +281,18 @@
                     return line.length() != 0 ? line.toString() : null;
                 case (byte) '\r':
                     if (foundTerminator) {
-                        seek(unreadPosition);
+                        seekInternal(unreadPosition);
                         return line.toString();
                     }
                     foundTerminator = true;
                     /* Have to be able to peek ahead one byte */
-                    unreadPosition = getFilePointer();
+                    unreadPosition = position;
                     break;
                 case (byte) '\n':
                     return line.toString();
                 default:
                     if (foundTerminator) {
-                        seek(unreadPosition);
+                        seekInternal(unreadPosition);
                         return line.toString();
                     }
                     line.append((char) nextByte);