You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/04/11 16:13:28 UTC
[11/20] git commit: Make scrub and cleanup operations throttled patch
by Vijay; reviewed by yukim for CASSANDRA-4100
Make scrub and cleanup operations throttled
patch by Vijay; reviewed by yukim for CASSANDRA-4100
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/092dc586
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/092dc586
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/092dc586
Branch: refs/heads/trunk
Commit: 092dc586f556b1c2bef048d9ee8672240a31f442
Parents: eef93e7
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Tue Apr 10 17:34:18 2012 -0700
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Tue Apr 10 17:34:18 2012 -0700
----------------------------------------------------------------------
.../db/compaction/AbstractCompactionIterable.java | 20 --------------
.../db/compaction/CompactionController.java | 21 +++++++++++++++
.../db/compaction/CompactionIterable.java | 2 +-
.../cassandra/db/compaction/CompactionManager.java | 6 ++++
.../db/compaction/ParallelCompactionIterable.java | 2 +-
5 files changed, 29 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/092dc586/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
index e05a64c..95e6590 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
@@ -28,12 +28,9 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.Throttle;
public abstract class AbstractCompactionIterable extends CompactionInfo.Holder implements Iterable<AbstractCompactedRow>
{
@@ -45,8 +42,6 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
protected volatile long bytesRead;
protected final List<SSTableScanner> scanners;
- protected final Throttle throttle;
-
public AbstractCompactionIterable(CompactionController controller, OperationType type, List<SSTableScanner> scanners)
{
this.controller = controller;
@@ -58,21 +53,6 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
for (SSTableScanner scanner : scanners)
bytes += scanner.getFileLength();
this.totalBytes = bytes;
-
- this.throttle = new Throttle(toString(), new Throttle.ThroughputFunction()
- {
- /** @return Instantaneous throughput target in bytes per millisecond. */
- public int targetThroughput()
- {
- if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || StorageService.instance.isBootstrapMode())
- // throttling disabled
- return 0;
- // total throughput
- int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
- // per stream throughput (target bytes per MS)
- return totalBytesPerMS / Math.max(1, CompactionManager.instance.getActiveCompactions());
- }
- });
}
protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/092dc586/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 1da6f9c..9eaefe7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -30,6 +30,8 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Throttle;
import org.apache.cassandra.utils.IntervalTree.Interval;
import org.apache.cassandra.utils.IntervalTree.IntervalTree;
@@ -47,6 +49,20 @@ public class CompactionController
public final int gcBefore;
public boolean keyExistenceIsExpensive;
public final int mergeShardBefore;
+ private final Throttle throttle = new Throttle("Cassandra_Throttle", new Throttle.ThroughputFunction()
+ {
+ /** @return Instantaneous throughput target in bytes per millisecond. */
+ public int targetThroughput()
+ {
+ if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || StorageService.instance.isBootstrapMode())
+ // throttling disabled
+ return 0;
+ // total throughput
+ int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
+ // per stream throughput (target bytes per MS)
+ return totalBytesPerMS / Math.max(1, CompactionManager.instance.getActiveCompactions());
+ }
+ });
public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
{
@@ -153,4 +169,9 @@ public class CompactionController
{
return getCompactedRow(Collections.singletonList(row));
}
+
+ public void mayThrottle(long currentBytes)
+ {
+ throttle.throttle(currentBytes);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/092dc586/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index eb88489..8a4aa0e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -119,7 +119,7 @@ public class CompactionIterable extends AbstractCompactionIterable
for (SSTableScanner scanner : scanners)
n += scanner.getFilePointer();
bytesRead = n;
- throttle.throttle(bytesRead);
+ controller.mayThrottle(bytesRead);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/092dc586/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 37361cf..d14a13a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -468,6 +468,7 @@ public class CompactionManager implements CompactionManagerMBean
// row header (key or data size) is corrupt. (This means our position in the index file will be one row
// "ahead" of the data file.)
final RandomAccessReader dataFile = sstable.openDataReader(true);
+ long rowsRead = 0;
RandomAccessReader indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
ScrubInfo scrubInfo = new ScrubInfo(dataFile, sstable);
executor.beginCompaction(scrubInfo);
@@ -606,6 +607,8 @@ public class CompactionManager implements CompactionManagerMBean
badRows++;
}
}
+ if ((rowsRead++ % 1000) == 0)
+ controller.mayThrottle(dataFile.getFilePointer());
}
if (writer.getFilePointer() > 0)
@@ -689,6 +692,7 @@ public class CompactionManager implements CompactionManagerMBean
throw new IOException("disk full");
SSTableScanner scanner = sstable.getDirectScanner();
+ long rowsRead = 0;
Collection<ByteBuffer> indexedColumns = cfs.indexManager.getIndexedColumns();
List<IColumn> indexedColumnsInRow = null;
@@ -748,6 +752,8 @@ public class CompactionManager implements CompactionManagerMBean
}
}
}
+ if ((rowsRead++ % 1000) == 0)
+ controller.mayThrottle(scanner.getFilePointer());
}
if (writer != null)
newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/092dc586/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index 764a549..03a29cd 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -167,7 +167,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
for (SSTableScanner scanner : scanners)
n += scanner.getFilePointer();
bytesRead = n;
- throttle.throttle(bytesRead);
+ controller.mayThrottle(bytesRead);
}
return compacted;
}