You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/04/11 16:13:28 UTC

[11/20] git commit: Make scrub and cleanup operations throttled patch by Vijay; reviewed by yukim for CASSANDRA-4100

Make scrub and cleanup operations throttled
patch by Vijay; reviewed by yukim for CASSANDRA-4100


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/092dc586
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/092dc586
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/092dc586

Branch: refs/heads/trunk
Commit: 092dc586f556b1c2bef048d9ee8672240a31f442
Parents: eef93e7
Author: Vijay Parthasarathy <vi...@gmail.com>
Authored: Tue Apr 10 17:34:18 2012 -0700
Committer: Vijay Parthasarathy <vi...@gmail.com>
Committed: Tue Apr 10 17:34:18 2012 -0700

----------------------------------------------------------------------
 .../db/compaction/AbstractCompactionIterable.java  |   20 --------------
 .../db/compaction/CompactionController.java        |   21 +++++++++++++++
 .../db/compaction/CompactionIterable.java          |    2 +-
 .../cassandra/db/compaction/CompactionManager.java |    6 ++++
 .../db/compaction/ParallelCompactionIterable.java  |    2 +-
 5 files changed, 29 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/092dc586/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
index e05a64c..95e6590 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
@@ -28,12 +28,9 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.Throttle;
 
 public abstract class AbstractCompactionIterable extends CompactionInfo.Holder implements Iterable<AbstractCompactedRow>
 {
@@ -45,8 +42,6 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
     protected volatile long bytesRead;
     protected final List<SSTableScanner> scanners;
 
-    protected final Throttle throttle;
-
     public AbstractCompactionIterable(CompactionController controller, OperationType type, List<SSTableScanner> scanners)
     {
         this.controller = controller;
@@ -58,21 +53,6 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
         for (SSTableScanner scanner : scanners)
             bytes += scanner.getFileLength();
         this.totalBytes = bytes;
-
-        this.throttle = new Throttle(toString(), new Throttle.ThroughputFunction()
-        {
-            /** @return Instantaneous throughput target in bytes per millisecond. */
-            public int targetThroughput()
-            {
-                if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || StorageService.instance.isBootstrapMode())
-                    // throttling disabled
-                    return 0;
-                // total throughput
-                int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
-                // per stream throughput (target bytes per MS)
-                return totalBytesPerMS / Math.max(1, CompactionManager.instance.getActiveCompactions());
-            }
-        });
     }
 
     protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/092dc586/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 1da6f9c..9eaefe7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -30,6 +30,8 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Throttle;
 import org.apache.cassandra.utils.IntervalTree.Interval;
 import org.apache.cassandra.utils.IntervalTree.IntervalTree;
 
@@ -47,6 +49,20 @@ public class CompactionController
     public final int gcBefore;
     public boolean keyExistenceIsExpensive;
     public final int mergeShardBefore;
+    private final Throttle throttle = new Throttle("Cassandra_Throttle", new Throttle.ThroughputFunction()
+    {
+        /** @return Instantaneous throughput target in bytes per millisecond. */
+        public int targetThroughput()
+        {
+            if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || StorageService.instance.isBootstrapMode())
+                // throttling disabled
+                return 0;
+            // total throughput
+            int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec() * 1024 * 1024 / 1000;
+            // per stream throughput (target bytes per MS)
+            return totalBytesPerMS / Math.max(1, CompactionManager.instance.getActiveCompactions());
+        }
+    });
 
     public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
     {
@@ -153,4 +169,9 @@ public class CompactionController
     {
         return getCompactedRow(Collections.singletonList(row));
     }
+    
+    public void mayThrottle(long currentBytes)
+    {
+        throttle.throttle(currentBytes);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/092dc586/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index eb88489..8a4aa0e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -119,7 +119,7 @@ public class CompactionIterable extends AbstractCompactionIterable
                     for (SSTableScanner scanner : scanners)
                         n += scanner.getFilePointer();
                     bytesRead = n;
-                    throttle.throttle(bytesRead);
+                    controller.mayThrottle(bytesRead);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/092dc586/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 37361cf..d14a13a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -468,6 +468,7 @@ public class CompactionManager implements CompactionManagerMBean
         // row header (key or data size) is corrupt. (This means our position in the index file will be one row
         // "ahead" of the data file.)
         final RandomAccessReader dataFile = sstable.openDataReader(true);
+        long rowsRead = 0;
         RandomAccessReader indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
         ScrubInfo scrubInfo = new ScrubInfo(dataFile, sstable);
         executor.beginCompaction(scrubInfo);
@@ -606,6 +607,8 @@ public class CompactionManager implements CompactionManagerMBean
                         badRows++;
                     }
                 }
+                if ((rowsRead++ % 1000) == 0)
+                    controller.mayThrottle(dataFile.getFilePointer());
             }
 
             if (writer.getFilePointer() > 0)
@@ -689,6 +692,7 @@ public class CompactionManager implements CompactionManagerMBean
                 throw new IOException("disk full");
 
             SSTableScanner scanner = sstable.getDirectScanner();
+            long rowsRead = 0;
             Collection<ByteBuffer> indexedColumns = cfs.indexManager.getIndexedColumns();
             List<IColumn> indexedColumnsInRow = null;
 
@@ -748,6 +752,8 @@ public class CompactionManager implements CompactionManagerMBean
                             }
                         }
                     }
+                    if ((rowsRead++ % 1000) == 0)
+                        controller.mayThrottle(scanner.getFilePointer());
                 }
                 if (writer != null)
                     newSstable = writer.closeAndOpenReader(sstable.maxDataAge);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/092dc586/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index 764a549..03a29cd 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -167,7 +167,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
                 for (SSTableScanner scanner : scanners)
                     n += scanner.getFilePointer();
                 bytesRead = n;
-                throttle.throttle(bytesRead);
+                controller.mayThrottle(bytesRead);
             }
             return compacted;
         }