You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/02/12 15:02:54 UTC
[2/5] cassandra git commit: Ensure SSTableReader.last corresponds
exactly with the file end
Ensure SSTableReader.last corresponds exactly with the file end
patch by benedict; reviewed by marcus for CASSANDRA-8750
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ced7a34
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ced7a34
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ced7a34
Branch: refs/heads/trunk
Commit: 0ced7a345cf5b5dd7da27f7dfb51aad933b4f21c
Parents: 4eb9fa7
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Feb 12 13:45:19 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Feb 12 13:45:19 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compress/CompressedRandomAccessReader.java | 2 +-
.../io/compress/CompressedSequentialWriter.java | 11 +++-
.../io/compress/CompressionMetadata.java | 58 ++++++++++++++----
.../cassandra/io/sstable/SSTableReader.java | 6 +-
.../cassandra/io/sstable/SSTableWriter.java | 8 +--
.../io/util/BufferedPoolingSegmentedFile.java | 5 +-
.../io/util/BufferedSegmentedFile.java | 5 +-
.../io/util/CompressedPoolingSegmentedFile.java | 5 +-
.../io/util/CompressedSegmentedFile.java | 9 +--
.../cassandra/io/util/MmappedSegmentedFile.java | 19 ++++--
.../cassandra/io/util/PoolingSegmentedFile.java | 2 +-
.../cassandra/io/util/RandomAccessReader.java | 63 ++++++++++++++------
.../apache/cassandra/io/util/SegmentedFile.java | 19 ++++--
.../cassandra/io/util/ThrottledReader.java | 8 +--
15 files changed, 152 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cbb4334..bbbf9a6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.4
+ * Ensure SSTableReader.last corresponds exactly with the file end (CASSANDRA-8750)
* Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747)
* Enforce SSTableReader.first/last (CASSANDRA-8744)
* Cleanup SegmentedFile API (CASSANDRA-8749)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index 49dcd3d..e29ad33 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -67,7 +67,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
protected CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, PoolingSegmentedFile owner) throws FileNotFoundException
{
- super(new File(dataFilePath), metadata.chunkLength(), owner);
+ super(new File(dataFilePath), metadata.chunkLength(), metadata.compressedFileLength, owner);
this.metadata = metadata;
checksum = metadata.hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32();
compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 87eb2fb..ea0d785 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -33,6 +33,10 @@ import org.apache.cassandra.io.util.DataIntegrityMetadata;
import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.SequentialWriter;
+import static org.apache.cassandra.io.compress.CompressionMetadata.Writer.OpenType.FINAL;
+import static org.apache.cassandra.io.compress.CompressionMetadata.Writer.OpenType.SHARED;
+import static org.apache.cassandra.io.compress.CompressionMetadata.Writer.OpenType.SHARED_FINAL;
+
public class CompressedSequentialWriter extends SequentialWriter
{
private final DataIntegrityMetadata.ChecksumWriter crcMetadata;
@@ -142,10 +146,11 @@ public class CompressedSequentialWriter extends SequentialWriter
runPostFlush.run();
}
- public CompressionMetadata open(SSTableWriter.FinishType finishType)
+ public CompressionMetadata open(long overrideLength, boolean isFinal)
{
- assert finishType != SSTableWriter.FinishType.NORMAL || current == originalSize;
- return metadataWriter.open(originalSize, chunkOffset, finishType);
+ if (overrideLength <= 0)
+ return metadataWriter.open(originalSize, chunkOffset, isFinal ? FINAL : SHARED_FINAL);
+ return metadataWriter.open(overrideLength, chunkOffset, SHARED);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index aaf1656..ad087c7 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -322,18 +322,56 @@ public class CompressionMetadata
}
}
- public CompressionMetadata open(long dataLength, long compressedLength, SSTableWriter.FinishType finishType)
+ static enum OpenType
{
- RefCountedMemory offsets;
- if (finishType.isFinal)
- {
- // we now know how many offsets we have and can resize the offsets properly
- offsets = this.offsets.copy(count * 8L);
- this.offsets.unreference();
- }
- else
+ // i.e. FinishType == EARLY; we will use the RefCountedMemory in possibly multiple instances
+ SHARED,
+ // i.e. FinishType == EARLY, but the sstable has been completely written, so we can
+ // finalise the contents and size of the memory, but must retain a reference to it
+ SHARED_FINAL,
+ // i.e. FinishType == NORMAL or FINISH_EARLY, i.e. we have actually finished writing the table
+ // and will never need to open the metadata again, so we can release any references to it here
+ FINAL
+ }
+
+ public CompressionMetadata open(long dataLength, long compressedLength, OpenType type)
+ {
+ RefCountedMemory offsets = this.offsets;
+ int count = this.count;
+ switch (type)
{
- offsets = this.offsets;
+ case FINAL: case SHARED_FINAL:
+ // maybe resize the data
+ if (this.offsets.size() != count * 8L)
+ {
+ offsets = this.offsets.copy(count * 8L);
+ // release our reference to the original shared data;
+ // we don't do this if not resizing since we must pass out existing
+ // reference onto our caller
+ this.offsets.unreference();
+ }
+ // null out our reference to the original shared data to catch accidental reuse
+ this.offsets = null;
+ if (type == OpenType.SHARED_FINAL)
+ {
+ // we will use the data again, so stash our resized data back, and take an extra reference to it
+ this.offsets = offsets;
+ this.offsets.reference();
+ }
+ break;
+
+ case SHARED:
+
+ // we should only be opened on a compression data boundary; truncate our size to this boundary
+ assert dataLength % parameters.chunkLength() == 0;
+ count = (int) (dataLength / parameters.chunkLength());
+ // grab our actual compressed length from the next offset from our the position we're opened to
+ if (count < this.count)
+ compressedLength = offsets.getLong(count * 8);
+ break;
+
+ default:
+ throw new AssertionError();
}
return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength, Descriptor.Version.CURRENT.hasPostCompressionAdlerChecksums);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 5abd1b7..202bc4d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1314,11 +1314,7 @@ public class SSTableReader extends SSTable implements RefCounted<SSTableReader>
long left = getPosition(leftBound, Operator.GT).position;
long right = (rightBound.compareTo(last) > 0)
- ? (openReason == OpenReason.EARLY
- // if opened early, we overlap with the old sstables by one key, so we know that the last
- // (and further) key(s) will be streamed from these if necessary
- ? getPosition(last, Operator.GT, false, true).position
- : uncompressedLength())
+ ? uncompressedLength()
: getPosition(rightBound, Operator.GT).position;
if (left == right)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 2c1cf0e..b67685d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -398,8 +398,8 @@ public class SSTableWriter extends SSTable
assert boundary.indexLength > 0 && boundary.dataLength > 0;
Descriptor link = makeTmpLinks();
// open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
- SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), FinishType.EARLY);
- SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), FinishType.EARLY);
+ SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), boundary.indexLength);
+ SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), boundary.dataLength);
SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
components, metadata,
partitioner, ifile,
@@ -451,8 +451,8 @@ public class SSTableWriter extends SSTable
desc = makeTmpLinks();
// finalize in-memory state for the reader
- SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType);
- SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType);
+ SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType.isFinal);
+ SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType.isFinal);
SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
components,
this.metadata,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
index e4c363a..f04a1fb 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
@@ -45,9 +45,10 @@ public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
// only one segment in a standard-io file
}
- public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
+ public SegmentedFile complete(String path, long overrideLength, boolean isFinal)
{
- long length = new File(path).length();
+ assert !isFinal || overrideLength <= 0;
+ long length = overrideLength > 0 ? overrideLength : new File(path).length();
return new BufferedPoolingSegmentedFile(path, length);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index c29bbf3..1a1d208 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -53,9 +53,10 @@ public class BufferedSegmentedFile extends SegmentedFile
// only one segment in a standard-io file
}
- public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
+ public SegmentedFile complete(String path, long overrideLength, boolean isFinal)
{
- long length = new File(path).length();
+ assert !isFinal || overrideLength <= 0;
+ long length = overrideLength > 0 ? overrideLength : new File(path).length();
return new BufferedSegmentedFile(path, length);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index c514b80..40a54dc 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -23,7 +23,6 @@ import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
import org.apache.cassandra.io.compress.CompressedThrottledReader;
import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.SSTableWriter;
public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile implements ICompressedFile
{
@@ -68,9 +67,9 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
// only one segment in a standard-io file
}
- public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
+ public SegmentedFile complete(String path, long overrideLength, boolean isFinal)
{
- return new CompressedPoolingSegmentedFile(path, metadata(path, finishType));
+ return new CompressedPoolingSegmentedFile(path, metadata(path, overrideLength, isFinal));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index 6b5c2e1..9721bc3 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -73,17 +73,18 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
// only one segment in a standard-io file
}
- protected CompressionMetadata metadata(String path, SSTableWriter.FinishType finishType)
+ protected CompressionMetadata metadata(String path, long overrideLength, boolean isFinal)
{
if (writer == null)
return CompressionMetadata.create(path);
- return writer.open(finishType);
+ return writer.open(overrideLength, isFinal);
}
- public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
+ public SegmentedFile complete(String path, long overrideLength, boolean isFinal)
{
- return new CompressedSegmentedFile(path, metadata(path, finishType));
+ assert !isFinal || overrideLength <= 0;
+ return new CompressedSegmentedFile(path, metadata(path, overrideLength, isFinal));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 8067c68..1b23343 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -183,31 +183,38 @@ public class MmappedSegmentedFile extends SegmentedFile
}
}
- public SegmentedFile complete(String path, SSTableWriter.FinishType finishType)
+ public SegmentedFile complete(String path, long overrideLength, boolean isFinal)
{
- long length = new File(path).length();
+ assert !isFinal || overrideLength <= 0;
+ long length = overrideLength > 0 ? overrideLength : new File(path).length();
// create the segments
- return new MmappedSegmentedFile(path, length, createSegments(path));
+ return new MmappedSegmentedFile(path, length, createSegments(path, length));
}
- private Segment[] createSegments(String path)
+ private Segment[] createSegments(String path, long length)
{
RandomAccessFile raf;
- long length;
try
{
raf = new RandomAccessFile(path, "r");
- length = raf.length();
}
catch (IOException e)
{
throw new RuntimeException(e);
}
+ // if we're early finishing a range that doesn't span multiple segments, but the finished file now does,
+ // we remove these from the end (we loop incase somehow this spans multiple segments, but that would
+ // be a loco dataset
+ while (length < boundaries.get(boundaries.size() - 1))
+ boundaries.remove(boundaries.size() -1);
+
// add a sentinel value == length
List<Long> boundaries = new ArrayList<>(this.boundaries);
if (length != boundaries.get(boundaries.size() - 1))
boundaries.add(length);
+
+
int segcount = boundaries.size() - 1;
Segment[] segments = new Segment[segcount];
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
index d3c90c7..4ab98af 100644
--- a/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/PoolingSegmentedFile.java
@@ -67,7 +67,7 @@ public abstract class PoolingSegmentedFile extends SegmentedFile
protected RandomAccessReader createPooledReader()
{
- return RandomAccessReader.open(new File(path), this);
+ return RandomAccessReader.open(new File(path), length, this);
}
public void recycle(RandomAccessReader reader)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 81e45b5..df68ca3 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -55,6 +55,10 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
protected RandomAccessReader(File file, int bufferSize, PoolingSegmentedFile owner) throws FileNotFoundException
{
+ this(file, bufferSize, -1, owner);
+ }
+ protected RandomAccessReader(File file, int bufferSize, long overrideLength, PoolingSegmentedFile owner) throws FileNotFoundException
+ {
super(file, "r");
this.owner = owner;
@@ -69,33 +73,49 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
buffer = new byte[bufferSize];
// we can cache file length in read-only mode
- try
- {
- fileLength = channel.size();
- }
- catch (IOException e)
+ long fileLength = overrideLength;
+ if (fileLength <= 0)
{
- throw new FSReadError(e, filePath);
+ try
+ {
+ fileLength = channel.size();
+ }
+ catch (IOException e)
+ {
+ throw new FSReadError(e, filePath);
+ }
}
+
+ this.fileLength = fileLength;
validBufferBytes = -1; // that will trigger reBuffer() on demand by read/seek operations
}
- public static RandomAccessReader open(File file, PoolingSegmentedFile owner)
+ public static RandomAccessReader open(File file, long overrideSize, PoolingSegmentedFile owner)
{
- return open(file, DEFAULT_BUFFER_SIZE, owner);
+ return open(file, DEFAULT_BUFFER_SIZE, overrideSize, owner);
}
public static RandomAccessReader open(File file)
{
- return open(file, DEFAULT_BUFFER_SIZE, null);
+ return open(file, -1L);
+ }
+
+ public static RandomAccessReader open(File file, long overrideSize)
+ {
+ return open(file, DEFAULT_BUFFER_SIZE, overrideSize, null);
}
@VisibleForTesting
static RandomAccessReader open(File file, int bufferSize, PoolingSegmentedFile owner)
{
+ return open(file, bufferSize, -1L, owner);
+ }
+
+ private static RandomAccessReader open(File file, int bufferSize, long overrideSize, PoolingSegmentedFile owner)
+ {
try
{
- return new RandomAccessReader(file, bufferSize, owner);
+ return new RandomAccessReader(file, bufferSize, overrideSize, owner);
}
catch (FileNotFoundException e)
{
@@ -118,22 +138,27 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
try
{
- if (bufferOffset >= channel.size())
- return;
+ int read = buffer.length;
+ if (bufferOffset + read > fileLength)
+ {
+ if (bufferOffset >= fileLength)
+ return;
+ read = (int) (fileLength - bufferOffset);
+ }
channel.position(bufferOffset); // setting channel position
- int read = 0;
-
- while (read < buffer.length)
+ int offset = 0;
+ while (read > 0)
{
- int n = super.read(buffer, read, buffer.length - read);
+ int n = super.read(buffer, offset, read);
if (n < 0)
- break;
- read += n;
+ throw new IllegalStateException();
+ read -= n;
+ offset += n;
}
- validBufferBytes = read;
+ validBufferBytes = offset;
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index 510ed81..146494d 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
-import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.RefCounted;
import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
@@ -96,13 +95,13 @@ public abstract class SegmentedFile extends SharedCloseableImpl
public RandomAccessReader createReader()
{
- return RandomAccessReader.open(new File(path));
+ return RandomAccessReader.open(new File(path), length);
}
public RandomAccessReader createThrottledReader(RateLimiter limiter)
{
assert limiter != null;
- return ThrottledReader.open(new File(path), limiter);
+ return ThrottledReader.open(new File(path), length, limiter);
}
public FileDataInput getSegment(long position)
@@ -156,11 +155,21 @@ public abstract class SegmentedFile extends SharedCloseableImpl
* Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
* @param path The file on disk.
*/
- public abstract SegmentedFile complete(String path, SSTableWriter.FinishType openType);
+ protected abstract SegmentedFile complete(String path, long overrideLength, boolean isFinal);
public SegmentedFile complete(String path)
{
- return complete(path, SSTableWriter.FinishType.NORMAL);
+ return complete(path, -1, true);
+ }
+
+ public SegmentedFile complete(String path, boolean isFinal)
+ {
+ return complete(path, -1, isFinal);
+ }
+
+ public SegmentedFile complete(String path, long overrideLength)
+ {
+ return complete(path, overrideLength, false);
}
public void serializeBounds(DataOutput out) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ced7a34/src/java/org/apache/cassandra/io/util/ThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ThrottledReader.java b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
index b12a8a8..c4845c5 100644
--- a/src/java/org/apache/cassandra/io/util/ThrottledReader.java
+++ b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
@@ -30,9 +30,9 @@ public class ThrottledReader extends RandomAccessReader
{
private final RateLimiter limiter;
- protected ThrottledReader(File file, RateLimiter limiter) throws FileNotFoundException
+ protected ThrottledReader(File file, long overrideLength, RateLimiter limiter) throws FileNotFoundException
{
- super(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, null);
+ super(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, overrideLength, null);
this.limiter = limiter;
}
@@ -42,11 +42,11 @@ public class ThrottledReader extends RandomAccessReader
super.reBuffer();
}
- public static ThrottledReader open(File file, RateLimiter limiter)
+ public static ThrottledReader open(File file, long overrideLength, RateLimiter limiter)
{
try
{
- return new ThrottledReader(file, limiter);
+ return new ThrottledReader(file, overrideLength, limiter);
}
catch (FileNotFoundException e)
{