You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/04/30 22:51:41 UTC
[3/6] git commit: fix compaction throttling bursty-ness patch by
yukim and jbellis for CASSANDRA-4316
fix compaction throttling bursty-ness
patch by yukim and jbellis for CASSANDRA-4316
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2b0797b2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2b0797b2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2b0797b2
Branch: refs/heads/cassandra-1.2
Commit: 2b0797b24e2d4a433c0e17506a0d8bb812f8f2dd
Parents: 927c4a4
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Apr 30 14:09:25 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Apr 30 15:36:57 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/AbstractCompactionStrategy.java | 4 +-
.../db/compaction/CompactionController.java | 23 ++-------
.../db/compaction/CompactionIterable.java | 12 ++---
.../cassandra/db/compaction/CompactionManager.java | 25 ++++++++-
.../db/compaction/LeveledCompactionStrategy.java | 7 ++-
.../db/compaction/ParallelCompactionIterable.java | 13 ++----
.../apache/cassandra/db/compaction/Scrubber.java | 8 +--
.../io/compress/CompressedRandomAccessReader.java | 2 +-
.../io/compress/CompressedThrottledReader.java | 38 +++++++++++++++
.../io/sstable/SSTableBoundedScanner.java | 6 ++-
.../apache/cassandra/io/sstable/SSTableReader.java | 25 +++++++---
.../cassandra/io/sstable/SSTableScanner.java | 7 ++-
.../apache/cassandra/io/util/ThrottledReader.java | 35 +++++++++++++
.../org/apache/cassandra/tools/SSTableExport.java | 5 +-
.../apache/cassandra/io/sstable/SSTableUtils.java | 4 +-
16 files changed, 151 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 93198f0..bfece4f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2.5
+ * fix compaction throttling bursty-ness (CASSANDRA-4316)
* reduce memory consumption of IndexSummary (CASSANDRA-5506)
* remove per-row column name bloom filters (CASSANDRA-5492)
* Include fatal errors in trace events (CASSANDRA-5447)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index a588216..636cb0d 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
+import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -150,9 +151,10 @@ public abstract class AbstractCompactionStrategy
*/
public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range)
{
+ RateLimiter limiter = CompactionManager.instance.getRateLimiter();
ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>();
for (SSTableReader sstable : sstables)
- scanners.add(sstable.getDirectScanner(range));
+ scanners.add(sstable.getDirectScanner(range, limiter));
return scanners;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index f3198ff..f91c7a5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -22,6 +22,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +38,8 @@ import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.AlwaysPresentFilter;
import org.apache.cassandra.utils.Throttle;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Throttle;
/**
* Manage compaction options.
@@ -50,20 +54,6 @@ public class CompactionController
public final int gcBefore;
public final int mergeShardBefore;
- private final Throttle throttle = new Throttle("Cassandra_Throttle", new Throttle.ThroughputFunction()
- {
- /** @return Instantaneous throughput target in bytes per millisecond. */
- public int targetThroughput()
- {
- if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || StorageService.instance.isBootstrapMode())
- // throttling disabled
- return 0;
- // total throughput
- int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
- // per stream throughput (target bytes per MS)
- return totalBytesPerMS / Math.max(1, CompactionManager.instance.getActiveCompactions());
- }
- });
public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore)
{
@@ -176,11 +166,6 @@ public class CompactionController
return getCompactedRow(Collections.singletonList(row));
}
- public void mayThrottle(long currentBytes)
- {
- throttle.throttle(currentBytes);
- }
-
public void close()
{
SSTableReader.releaseReferences(overlappingSSTables);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index 32b4942..3614ed1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -78,14 +78,10 @@ public class CompactionIterable extends AbstractCompactionIterable
finally
{
rows.clear();
- if ((row++ % 1000) == 0)
- {
- long n = 0;
- for (ICompactionScanner scanner : scanners)
- n += scanner.getCurrentPosition();
- bytesRead = n;
- controller.mayThrottle(bytesRead);
- }
+ long n = 0;
+ for (ICompactionScanner scanner : scanners)
+ n += scanner.getCurrentPosition();
+ bytesRead = n;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 1d273b6..96c3011 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -31,6 +31,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ConcurrentHashMultiset;
import com.google.common.collect.Multiset;
import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,6 +119,26 @@ public class CompactionManager implements CompactionManagerMBean
private final CompactionMetrics metrics = new CompactionMetrics(executor, validationExecutor);
private final Multiset<ColumnFamilyStore> compactingCF = ConcurrentHashMultiset.create();
+ private final RateLimiter compactionRateLimiter = RateLimiter.create(Double.MAX_VALUE);
+
+ /**
+ * Gets compaction rate limiter. When compaction_throughput_mb_per_sec is 0 or node is bootstrapping,
+ * this returns rate limiter with the rate of Double.MAX_VALUE bytes per second.
+ * Rate unit is bytes per sec.
+ *
+ * @return RateLimiter with rate limit set
+ */
+ public RateLimiter getRateLimiter()
+ {
+ double currentThroughput = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024;
+ // if throughput is set to 0, throttling is disabled
+ if (currentThroughput == 0 || StorageService.instance.isBootstrapMode())
+ currentThroughput = Double.MAX_VALUE;
+ if (compactionRateLimiter.getRate() != currentThroughput)
+ compactionRateLimiter.setRate(currentThroughput);
+ return compactionRateLimiter;
+ }
+
/**
* @return A lock, for which acquisition means no compactions can run.
*/
@@ -568,7 +589,7 @@ public class CompactionManager implements CompactionManagerMBean
if (compactionFileLocation == null)
throw new IOException("disk full");
- SSTableScanner scanner = sstable.getDirectScanner();
+ SSTableScanner scanner = sstable.getDirectScanner(getRateLimiter());
long rowsRead = 0;
List<IColumn> indexedColumnsInRow = null;
@@ -628,8 +649,6 @@ public class CompactionManager implements CompactionManagerMBean
}
}
}
- if ((rowsRead++ % 1000) == 0)
- controller.mayThrottle(scanner.getCurrentPosition());
}
if (writer != null)
newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 9a73299..f964297 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.*;
import com.google.common.primitives.Doubles;
+import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -178,7 +179,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
{
// L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each
for (SSTableReader sstable : byLevel.get(level))
- scanners.add(sstable.getDirectScanner(range));
+ scanners.add(sstable.getDirectScanner(range, CompactionManager.instance.getRateLimiter()));
}
else
{
@@ -208,7 +209,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
this.sstables = new ArrayList<SSTableReader>(sstables);
Collections.sort(this.sstables, SSTable.sstableComparator);
sstableIterator = this.sstables.iterator();
- currentScanner = sstableIterator.next().getDirectScanner(range);
+ currentScanner = sstableIterator.next().getDirectScanner(range, CompactionManager.instance.getRateLimiter());
long length = 0;
for (SSTableReader sstable : sstables)
@@ -233,7 +234,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
currentScanner = null;
return endOfData();
}
- currentScanner = sstableIterator.next().getDirectScanner(range);
+ currentScanner = sstableIterator.next().getDirectScanner(range, CompactionManager.instance.getRateLimiter());
}
}
catch (IOException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index e91846d..0f9407f 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -117,7 +117,6 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
private class Reducer extends MergeIterator.Reducer<RowContainer, CompactedRowContainer>
{
private final List<RowContainer> rows = new ArrayList<RowContainer>();
- private int row = 0;
private final ThreadPoolExecutor executor = new DebuggableThreadPoolExecutor(FBUtilities.getAvailableProcessors(),
Integer.MAX_VALUE,
@@ -137,14 +136,10 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
ParallelCompactionIterable.this.updateCounterFor(rows.size());
CompactedRowContainer compacted = getCompactedRow(rows);
rows.clear();
- if ((row++ % 1000) == 0)
- {
- long n = 0;
- for (ICompactionScanner scanner : scanners)
- n += scanner.getCurrentPosition();
- bytesRead = n;
- controller.mayThrottle(bytesRead);
- }
+ long n = 0;
+ for (ICompactionScanner scanner : scanners)
+ n += scanner.getCurrentPosition();
+ bytesRead = n;
return compacted;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 0601857..cb529cb 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -45,8 +45,6 @@ public class Scrubber implements Closeable
private final RandomAccessReader indexFile;
private final ScrubInfo scrubInfo;
- private long rowsRead;
-
private SSTableWriter writer;
private SSTableReader newSstable;
private SSTableReader newInOrderSstable;
@@ -94,7 +92,9 @@ public class Scrubber implements Closeable
// we'll also loop through the index at the same time, using the position from the index to recover if the
// row header (key or data size) is corrupt. (This means our position in the index file will be one row
// "ahead" of the data file.)
- this.dataFile = sstable.openDataReader(true);
+ this.dataFile = isOffline
+ ? sstable.openDataReader(true)
+ : sstable.openDataReader(CompactionManager.instance.getRateLimiter());
this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
this.scrubInfo = new ScrubInfo(dataFile, sstable);
}
@@ -249,8 +249,6 @@ public class Scrubber implements Closeable
badRows++;
}
}
- if ((rowsRead++ % 1000) == 0)
- controller.mayThrottle(dataFile.getFilePointer());
}
if (writer.getFilePointer() > 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index f245851..9da1c97 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -70,7 +70,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
// raw checksum bytes
private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]);
- private CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache, PoolingSegmentedFile owner) throws FileNotFoundException
+ protected CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache, PoolingSegmentedFile owner) throws FileNotFoundException
{
super(new File(dataFilePath), metadata.chunkLength(), skipIOCache, owner);
this.metadata = metadata;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
new file mode 100644
index 0000000..1b7b7a4
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
@@ -0,0 +1,38 @@
+package org.apache.cassandra.io.compress;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.io.util.PoolingSegmentedFile;
+import org.apache.cassandra.io.util.RandomAccessReader;
+
+public class CompressedThrottledReader extends CompressedRandomAccessReader
+{
+ private final RateLimiter limiter;
+
+ public CompressedThrottledReader(String file, CompressionMetadata metadata, RateLimiter limiter) throws FileNotFoundException
+ {
+ super(file, metadata, true, null);
+ this.limiter = limiter;
+ }
+
+ protected void reBuffer()
+ {
+ limiter.acquire(buffer.length);
+ super.reBuffer();
+ }
+
+ public static CompressedThrottledReader open(String file, CompressionMetadata metadata, RateLimiter limiter)
+ {
+ try
+ {
+ return new CompressedThrottledReader(file, metadata, limiter);
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
index 56be212..a3c6bbb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.io.sstable;
import java.util.Arrays;
import java.util.Iterator;
+import com.google.common.util.concurrent.RateLimiter;
+
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.utils.Pair;
@@ -32,9 +34,9 @@ public class SSTableBoundedScanner extends SSTableScanner
private final Iterator<Pair<Long, Long>> rangeIterator;
private Pair<Long, Long> currentRange;
- SSTableBoundedScanner(SSTableReader sstable, boolean skipCache, Iterator<Pair<Long, Long>> rangeIterator)
+ SSTableBoundedScanner(SSTableReader sstable, boolean skipCache, Iterator<Pair<Long, Long>> rangeIterator, RateLimiter limiter)
{
- super(sstable, skipCache);
+ super(sstable, skipCache, limiter);
this.rangeIterator = rangeIterator;
assert rangeIterator.hasNext(); // use EmptyCompactionScanner otherwise
currentRange = rangeIterator.next();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index e4a2fe1..6b71223 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.*;
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +48,7 @@ import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
+import org.apache.cassandra.io.compress.CompressedThrottledReader;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.service.CacheService;
@@ -946,10 +949,10 @@ public class SSTableReader extends SSTable
* Direct I/O SSTableScanner
* @return A Scanner for seeking over the rows of the SSTable.
*/
- public SSTableScanner getDirectScanner()
- {
- return new SSTableScanner(this, true);
- }
+ public SSTableScanner getDirectScanner(RateLimiter limiter)
+ {
+ return new SSTableScanner(this, true, limiter);
+ }
/**
* Direct I/O SSTableScanner over a defined range of tokens.
@@ -957,14 +960,14 @@ public class SSTableReader extends SSTable
* @param range the range of keys to cover
* @return A Scanner for seeking over the rows of the SSTable.
*/
- public ICompactionScanner getDirectScanner(Range<Token> range)
+ public ICompactionScanner getDirectScanner(Range<Token> range, RateLimiter limiter)
{
if (range == null)
- return getDirectScanner();
+ return getDirectScanner(limiter);
Iterator<Pair<Long, Long>> rangeIterator = getPositionsForRanges(Collections.singletonList(range)).iterator();
return rangeIterator.hasNext()
- ? new SSTableBoundedScanner(this, true, rangeIterator)
+ ? new SSTableBoundedScanner(this, true, rangeIterator, limiter)
: new EmptyCompactionScanner(getFilename());
}
@@ -1117,6 +1120,14 @@ public class SSTableReader extends SSTable
return sstableMetadata.ancestors;
}
+ public RandomAccessReader openDataReader(RateLimiter limiter)
+ {
+ assert limiter != null;
+ return compression
+ ? CompressedThrottledReader.open(getFilename(), getCompressionMetadata(), limiter)
+ : ThrottledReader.open(new File(getFilename()), limiter);
+ }
+
public RandomAccessReader openDataReader(boolean skipIOCache)
{
return compression
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 22ac485..1df5842 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
+import com.google.common.util.concurrent.RateLimiter;
+
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.RowPosition;
@@ -45,10 +47,11 @@ public class SSTableScanner implements ICompactionScanner
/**
* @param sstable SSTable to scan.
+ * @param limiter
*/
- SSTableScanner(SSTableReader sstable, boolean skipCache)
+ SSTableScanner(SSTableReader sstable, boolean skipCache, RateLimiter limiter)
{
- this.dfile = sstable.openDataReader(skipCache);
+ this.dfile = limiter == null ? sstable.openDataReader(skipCache) : sstable.openDataReader(limiter);
this.ifile = sstable.openIndexReader(skipCache);
this.sstable = sstable;
this.filter = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/io/util/ThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ThrottledReader.java b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
new file mode 100644
index 0000000..d67550a
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/ThrottledReader.java
@@ -0,0 +1,35 @@
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+public class ThrottledReader extends RandomAccessReader
+{
+ private final RateLimiter limiter;
+
+ protected ThrottledReader(File file, RateLimiter limiter) throws FileNotFoundException
+ {
+ super(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, true, null);
+ this.limiter = limiter;
+ }
+
+ protected void reBuffer()
+ {
+ limiter.acquire(buffer.length);
+ super.reBuffer();
+ }
+
+ public static ThrottledReader open(File file, RateLimiter limiter)
+ {
+ try
+ {
+ return new ThrottledReader(file, limiter);
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java
index 51cdc72..90274d1 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -32,6 +32,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import com.google.common.util.concurrent.RateLimiter;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Option;
@@ -349,7 +350,7 @@ public class SSTableExport
public static void export(Descriptor desc, PrintStream outs, Collection<String> toExport, String[] excludes) throws IOException
{
SSTableReader reader = SSTableReader.open(desc);
- SSTableScanner scanner = reader.getDirectScanner();
+ SSTableScanner scanner = reader.getDirectScanner(null);
IPartitioner<?> partitioner = reader.partitioner;
@@ -406,7 +407,7 @@ public class SSTableExport
SSTableIdentityIterator row;
- SSTableScanner scanner = reader.getDirectScanner();
+ SSTableScanner scanner = reader.getDirectScanner(null);
outs.println("[");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2b0797b2/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 2b0a13a..0b8fd25 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -72,8 +72,8 @@ public class SSTableUtils
public static void assertContentEquals(SSTableReader lhs, SSTableReader rhs) throws IOException
{
- SSTableScanner slhs = lhs.getDirectScanner();
- SSTableScanner srhs = rhs.getDirectScanner();
+ SSTableScanner slhs = lhs.getDirectScanner(null);
+ SSTableScanner srhs = rhs.getDirectScanner(null);
while (slhs.hasNext())
{
OnDiskAtomIterator ilhs = slhs.next();