You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/02/12 15:02:54 UTC

[2/5] cassandra git commit: Ensure SSTableReader.last corresponds exactly with the file end

Ensure SSTableReader.last corresponds exactly with the file end

patch by benedict; reviewed by marcus for CASSANDRA-8750


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ced7a34
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ced7a34
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ced7a34

Branch: refs/heads/trunk
Commit: 0ced7a345cf5b5dd7da27f7dfb51aad933b4f21c
Parents: 4eb9fa7
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Feb 12 13:45:19 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Feb 12 13:45:19 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compress/CompressedRandomAccessReader.java  |  2 +-
 .../io/compress/CompressedSequentialWriter.java | 11 +++-
 .../io/compress/CompressionMetadata.java        | 58 ++++++++++++++----
 .../cassandra/io/sstable/SSTableReader.java     |  6 +-
 .../cassandra/io/sstable/SSTableWriter.java     |  8 +--
 .../io/util/BufferedPoolingSegmentedFile.java   |  5 +-
 .../io/util/BufferedSegmentedFile.java          |  5 +-
 .../io/util/CompressedPoolingSegmentedFile.java |  5 +-
 .../io/util/CompressedSegmentedFile.java        |  9 +--
 .../cassandra/io/util/MmappedSegmentedFile.java | 19 ++++--
 .../cassandra/io/util/PoolingSegmentedFile.java |  2 +-
 .../cassandra/io/util/RandomAccessReader.java   | 63 ++++++++++++++------
 .../apache/cassandra/io/util/SegmentedFile.java | 19 ++++--
 .../cassandra/io/util/ThrottledReader.java      |  8 +--
 15 files changed, 152 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cbb4334..bbbf9a6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.4
+ * Ensure SSTableReader.last corresponds exactly with the file end (CASSANDRA-8750)
  * Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747)
  * Enforce SSTableReader.first/last (CASSANDRA-8744)
  * Cleanup SegmentedFile API (CASSANDRA-8749)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 49dcd3d..e29ad33 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -67,7 +67,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
 
     protected CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, PoolingSegmentedFile owner) throws FileNotFoundException
     {
-        super(new File(dataFilePath), metadata.chunkLength(), owner);
+        super(new File(dataFilePath), metadata.chunkLength(), metadata.compressedFileLength, owner);
         this.metadata = metadata;
         checksum = metadata.hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
         compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 87eb2fb..ea0d785 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -33,6 +33,10 @@ import org.apache.cassandra.io.util.DataIntegrityMetadata;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.SequentialWriter;
 
+import static org.apache.cassandra.io.compress.CompressionMetadata.Writer.OpenType.FINAL;
+import static org.apache.cassandra.io.compress.CompressionMetadata.Writer.OpenType.SHARED;
+import static org.apache.cassandra.io.compress.CompressionMetadata.Writer.OpenType.SHARED_FINAL;
+
 public class CompressedSequentialWriter extends SequentialWriter
 {
     private final DataIntegrityMetadata.ChecksumWriter crcMetadata;
@@ -142,10 +146,11 @@ public class CompressedSequentialWriter extends SequentialWriter
             runPostFlush.run();
     }
 
-    public CompressionMetadata open(SSTableWriter.FinishType finishType)
+    public CompressionMetadata open(long overrideLength, boolean isFinal)
     {
-        assert finishType != SSTableWriter.FinishType.NORMAL || current == originalSize;
-        return metadataWriter.open(originalSize, chunkOffset, finishType);
+        if (overrideLength <= 0)
+            return metadataWriter.open(originalSize, chunkOffset, isFinal ? FINAL : SHARED_FINAL);
+        return metadataWriter.open(overrideLength, chunkOffset, SHARED);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index aaf1656..ad087c7 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -322,18 +322,56 @@ public class CompressionMetadata
             }
         }
 
-        public CompressionMetadata open(long dataLength, long compressedLength, SSTableWriter.FinishType finishType)
+        static enum OpenType
         {
-            RefCountedMemory offsets;
-            if (finishType.isFinal)
-            {
-                // we now know how many offsets we have and can resize the offsets properly
-                offsets = this.offsets.copy(count * 8L);
-                this.offsets.unreference();
-            }
-            else
+            // i.e. FinishType == EARLY; we will use the RefCountedMemory in possibly multiple instances
+            SHARED,
+            // i.e. FinishType == EARLY, but the sstable has been completely written, so we can
+            // finalise the contents and size of the memory, but must retain a reference to it
+            SHARED_FINAL,
+            // i.e. FinishType == NORMAL or FINISH_EARLY, i.e. we have actually finished writing the table
+            // and will never need to open the metadata again, so we can release any references to it here
+            FINAL
+        }
+
+        public CompressionMetadata open(long dataLength, long compressedLength, OpenType type)
+        {
+            RefCountedMemory offsets = this.offsets;
+            int count = this.count;
+            switch (type)
             {
-                offsets = this.offsets;
+                case FINAL: case SHARED_FINAL:
+                    // maybe resize the data
+                    if (this.offsets.size() != count * 8L)
+                    {
+                        offsets = this.offsets.copy(count * 8L);
+                        // release our reference to the original shared data;
+                        // we don't do this if not resizing since we must pass out existing
+                        // reference onto our caller
+                        this.offsets.unreference();
+                    }
+                    // null out our reference to the original shared data to catch accidental reuse
+                    this.offsets = null;
+                    if (type == OpenType.SHARED_FINAL)
+                    {
+                        // we will use the data again, so stash our resized data back, and take an extra reference to it
+                        this.offsets = offsets;
+                        this.offsets.reference();
+                    }
+                    break;
+
+                case SHARED:
+
+                    // we should only be opened on a compression data boundary; truncate our size to this boundary
+                    assert dataLength % parameters.chunkLength() == 0;
+                    count = (int) (dataLength / parameters.chunkLength());
+                    // grab our actual compressed length from the next offset from our the position we're opened to
+                    if (count < this.count)
+                        compressedLength = offsets.getLong(count * 8);
+                    break;
+
+                default:
+                    throw new AssertionError();
             }
             return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 5abd1b7..202bc4d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1314,11 +1314,7 @@ public class SSTableReader extends SSTable implements RefCounted<SSTableReader>
 
             long left = getPosition(leftBound, Operator.GT).position;
             long right = (rightBound.compareTo(last) > 0)
-                         ? (openReason == OpenReason.EARLY
-                            // if opened early, we overlap with the old sstables by one key, so we know that the last
-                            // (and further) key(s) will be streamed from these if necessary
-                            ? getPosition(last, Operator.GT, false, true).position
-                            : uncompressedLength())
+                         ? uncompressedLength()
                          : getPosition(rightBound, Operator.GT).position;
 
             if (left == right)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 2c1cf0e..b67685d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -398,8 +398,8 @@ public class SSTableWriter extends SSTable
         assert boundary.indexLength > 0 && boundary.dataLength > 0;
         Descriptor link = makeTmpLinks();
         // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
-        SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), FinishType.EARLY);
-        SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), FinishType.EARLY);
+        SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), boundary.indexLength);
+        SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), boundary.dataLength);
         SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
                                                            components, metadata,
                                                            partitioner, ifile,
@@ -451,8 +451,8 @@ public class SSTableWriter extends SSTable
             desc = makeTmpLinks();
 
         // finalize in-memory state for the reader
-        SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType);
-        SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType);
+        SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType.isFinal);
+        SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType.isFinal);
         SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
                                                            components,
                                                            this.metadata,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
index e4c363a..f04a1fb 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
@@ -45,9 +45,10 @@ public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
             // only one segment in a standard-io file
         }
 
-        public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
+        public SegmentedFile complete(String path, long overrideLength, boolean isFinal)
         {
-            long length = new File(path).length();
+            assert !isFinal || overrideLength <= 0;
+            long length = overrideLength > 0 ? overrideLength : new File(path).length();
             return new BufferedPoolingSegmentedFile(path, length);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index c29bbf3..1a1d208 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -53,9 +53,10 @@ public class BufferedSegmentedFile extends SegmentedFile
             // only one segment in a standard-io file
         }
 
-        public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
+        public SegmentedFile complete(String path, long overrideLength, boolean isFinal)
         {
-            long length = new File(path).length();
+            assert !isFinal || overrideLength <= 0;
+            long length = overrideLength > 0 ? overrideLength : new File(path).length();
             return new BufferedSegmentedFile(path, length);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index c514b80..40a54dc 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -23,7 +23,6 @@ import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.compress.CompressedThrottledReader;
 import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.SSTableWriter;
 
 public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile
 {
@@ -68,9 +67,9 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
             // only one segment in a standard-io file
         }
 
-        public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
+        public SegmentedFile complete(String path, long overrideLength, boolean isFinal)
         {
-            return new CompressedPoolingSegmentedFile(path, metadata(path, finishType));
+            return new CompressedPoolingSegmentedFile(path, metadata(path, overrideLength, isFinal));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index 6b5c2e1..9721bc3 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -73,17 +73,18 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
             // only one segment in a standard-io file
         }
 
-        protected CompressionMetadata metadata(String path, SSTableWriter.FinishType finishType)
+        protected CompressionMetadata metadata(String path, long overrideLength, boolean isFinal)
         {
             if (writer == null)
                 return CompressionMetadata.create(path);
 
-            return writer.open(finishType);
+            return writer.open(overrideLength, isFinal);
         }
 
-        public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
+        public SegmentedFile complete(String path, long overrideLength, boolean isFinal)
         {
-            return new CompressedSegmentedFile(path, metadata(path, finishType));
+            assert !isFinal || overrideLength <= 0;
+            return new CompressedSegmentedFile(path, metadata(path, overrideLength, isFinal));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 8067c68..1b23343 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -183,31 +183,38 @@ public class MmappedSegmentedFile extends SegmentedFile
             }
         }
 
-        public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
+        public SegmentedFile complete(String path, long overrideLength, boolean isFinal)
         {
-            long length = new File(path).length();
+            assert !isFinal || overrideLength <= 0;
+            long length = overrideLength > 0 ? overrideLength : new File(path).length();
             // create the segments
-            return new MmappedSegmentedFile(path, length, createSegments(path));
+            return new MmappedSegmentedFile(path, length, createSegments(path, length));
         }
 
-        private Segment[] createSegments(String path)
+        private Segment[] createSegments(String path, long length)
         {
             RandomAccessFile raf;
-            long length;
             try
             {
                 raf = new RandomAccessFile(path, "r");
-                length = raf.length();
             }
             catch (IOException e)
             {
                 throw new RuntimeException(e);
             }
 
+            // if we're early finishing a range that doesn't span multiple segments, but the finished file now does,
+            // we remove these from the end (we loop incase somehow this spans multiple segments, but that would
+            // be a loco dataset
+            while (length < boundaries.get(boundaries.size() - 1))
+                boundaries.remove(boundaries.size() -1);
+
             // add a sentinel value == length
             List<Long> boundaries = new ArrayList<>(this.boundaries);
             if (length != boundaries.get(boundaries.size() - 1))
                 boundaries.add(length);
+
+
             int segcount = boundaries.size() - 1;
             Segment[] segments = new Segment[segcount];
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
index d3c90c7..4ab98af 100644
--- a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
@@ -67,7 +67,7 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
 
     protected RandomAccessReader createPooledReader()
     {
-        return RandomAccessReader.open(new File(path), this);
+        return RandomAccessReader.open(new File(path), length, this);
     }
 
     public void recycle(RandomAccessReader reader)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 81e45b5..df68ca3 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -55,6 +55,10 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
 
     protected RandomAccessReader(File file, int bufferSize, PoolingSegmentedFile owner) throws FileNotFoundException
     {
+        this(file, bufferSize, -1, owner);
+    }
+    protected RandomAccessReader(File file, int bufferSize, long overrideLength, PoolingSegmentedFile owner) throws FileNotFoundException
+    {
         super(file, "r");
 
         this.owner = owner;
@@ -69,33 +73,49 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
         buffer = new byte[bufferSize];
 
         // we can cache file length in read-only mode
-        try
-        {
-            fileLength = channel.size();
-        }
-        catch (IOException e)
+        long fileLength = overrideLength;
+        if (fileLength <= 0)
         {
-            throw new FSReadError(e, filePath);
+            try
+            {
+                fileLength = channel.size();
+            }
+            catch (IOException e)
+            {
+                throw new FSReadError(e, filePath);
+            }
         }
+
+        this.fileLength = fileLength;
         validBufferBytes = -1; // that will trigger reBuffer() on demand by read/seek operations
     }
 
-    public static RandomAccessReader open(File file, PoolingSegmentedFile owner)
+    public static RandomAccessReader open(File file, long overrideSize, PoolingSegmentedFile owner)
     {
-        return open(file, DEFAULT_BUFFER_SIZE, owner);
+        return open(file, DEFAULT_BUFFER_SIZE, overrideSize, owner);
     }
 
     public static RandomAccessReader open(File file)
     {
-        return open(file, DEFAULT_BUFFER_SIZE, null);
+        return open(file, -1L);
+    }
+
+    public static RandomAccessReader open(File file, long overrideSize)
+    {
+        return open(file, DEFAULT_BUFFER_SIZE, overrideSize, null);
     }
 
     @VisibleForTesting
     static RandomAccessReader open(File file, int bufferSize, PoolingSegmentedFile owner)
     {
+        return open(file, bufferSize, -1L, owner);
+    }
+
+    private static RandomAccessReader open(File file, int bufferSize, long overrideSize, PoolingSegmentedFile owner)
+    {
         try
         {
-            return new RandomAccessReader(file, bufferSize, owner);
+            return new RandomAccessReader(file, bufferSize, overrideSize, owner);
         }
         catch (FileNotFoundException e)
         {
@@ -118,22 +138,27 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
 
         try
         {
-            if (bufferOffset >= channel.size())
-                return;
+            int read = buffer.length;
+            if (bufferOffset + read > fileLength)
+            {
+                if (bufferOffset >= fileLength)
+                    return;
+                read = (int) (fileLength - bufferOffset);
+            }
 
             channel.position(bufferOffset); // setting channel position
 
-            int read = 0;
-
-            while (read < buffer.length)
+            int offset = 0;
+            while (read > 0)
             {
-                int n = super.read(buffer, read, buffer.length - read);
+                int n = super.read(buffer, offset, read);
                 if (n < 0)
-                    break;
-                read += n;
+                    throw new IllegalStateException();
+                read -= n;
+                offset += n;
             }
 
-            validBufferBytes = read;
+            validBufferBytes = offset;
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index 510ed81..146494d 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
-import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.RefCounted;
 import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
@@ -96,13 +95,13 @@ public abstract class SegmentedFile extends SharedCloseableImpl
 
     public RandomAccessReader createReader()
     {
-        return RandomAccessReader.open(new File(path));
+        return RandomAccessReader.open(new File(path), length);
     }
 
     public RandomAccessReader createThrottledReader(RateLimiter limiter)
     {
         assert limiter != null;
-        return ThrottledReader.open(new File(path), limiter);
+        return ThrottledReader.open(new File(path), length, limiter);
     }
 
     public FileDataInput getSegment(long position)
@@ -156,11 +155,21 @@ public abstract class SegmentedFile extends SharedCloseableImpl
          * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
          * @param path The file on disk.
          */
-        public abstract SegmentedFile complete(String path, SSTableWriter.FinishType openType);
+        protected abstract SegmentedFile complete(String path, long overrideLength, boolean isFinal);
 
         public SegmentedFile complete(String path)
         {
-            return complete(path, SSTableWriter.FinishType.NORMAL);
+            return complete(path, -1, true);
+        }
+
+        public SegmentedFile complete(String path, boolean isFinal)
+        {
+            return complete(path, -1, isFinal);
+        }
+
+        public SegmentedFile complete(String path, long overrideLength)
+        {
+            return complete(path, overrideLength, false);
         }
 
         public void serializeBounds(DataOutput out) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/ThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ThrottledReader.java b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
index b12a8a8..c4845c5 100644
--- a/src/java/org/apache/cassandra/io/util/ThrottledReader.java
+++ b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
@@ -30,9 +30,9 @@ public class ThrottledReader extends RandomAccessReader
 {
     private final RateLimiter limiter;
 
-    protected ThrottledReader(File file, RateLimiter limiter) throws FileNotFoundException
+    protected ThrottledReader(File file, long overrideLength, RateLimiter limiter) throws FileNotFoundException
     {
-        super(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, null);
+        super(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, overrideLength, null);
         this.limiter = limiter;
     }
 
@@ -42,11 +42,11 @@ public class ThrottledReader extends RandomAccessReader
         super.reBuffer();
     }
 
-    public static ThrottledReader open(File file, RateLimiter limiter)
+    public static ThrottledReader open(File file, long overrideLength, RateLimiter limiter)
     {
         try
         {
-            return new ThrottledReader(file, limiter);
+            return new ThrottledReader(file, overrideLength, limiter);
         }
         catch (FileNotFoundException e)
         {