You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/10/06 22:30:18 UTC

svn commit: r1179832 - in /cassandra/trunk: ./ contrib/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/io/sstable/

Author: jbellis
Date: Thu Oct  6 20:30:17 2011
New Revision: 1179832

URL: http://svn.apache.org/viewvc?rev=1179832&view=rev
Log:
merge from 1.0

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct  6 20:30:17 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1178554,1178785,1178957,1179359,1179364
 /cassandra/branches/cassandra-0.8.0:1125021-1130369
 /cassandra/branches/cassandra-0.8.1:1101014-1125018
-/cassandra/branches/cassandra-1.0:1167085-1179418
-/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1179416
+/cassandra/branches/cassandra-1.0:1167085-1179828
+/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1179826
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1179832&r1=1179831&r2=1179832&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Oct  6 20:30:17 2011
@@ -6,6 +6,7 @@
  * off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271)
 
 1.0.0-final
+ * close scrubbed sstable fd before deleting it (CASSANDRA-3318)
  * fix bug preventing obsolete commitlog segments from being removed
    (CASSANDRA-3269)
  * tolerate whitespace in seed CDL (CASSANDRA-3263)
@@ -13,6 +14,9 @@
    (CASSANDRA-3295)
  * Fix broken CompressedRandomAccessReaderTest (CASSANDRA-3298)
  * (CQL) fix type information returned for wildcard queries (CASSANDRA-3311)
+ * add estimated tasks to LeveledCompactionStrategy (CASSANDRA-3322)
+ * avoid including compaction cache-warming in keycache stats (CASSANDRA-3325)
+ * run compaction and hinted handoff threads at MIN_PRIORITY (CASSANDRA-3308)
 Fixes merged from 0.8 below:
  * Fix tool .bat files when CASSANDRA_HOME contains spaces (CASSANDRA-3258)
  * Force flush of status table when removing/updating token (CASSANDRA-3243)

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct  6 20:30:17 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1178554,1178785,1178957,1179359,1179364
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
-/cassandra/branches/cassandra-1.0/contrib:1167085-1179418
-/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1179416
+/cassandra/branches/cassandra-1.0/contrib:1167085-1179828
+/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1179826
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct  6 20:30:17 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1178554,1178785,1178957,1179359,1179364
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1179418
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1179416
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1179828
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1179826
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct  6 20:30:17 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1178554,1178785,1178957,1179359,1179364
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1179418
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1179416
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1179828
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1179826
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct  6 20:30:17 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1178554,1178785,1178957,1179359,1179364
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1179418
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1179416
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1179828
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1179826
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct  6 20:30:17 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1178554,1178785,1178957,1179359,1179364
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1179418
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1179416
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1179828
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1179826
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct  6 20:30:17 2011
@@ -4,8 +4,8 @@
 /cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1178554,1178785,1178957,1179359,1179364
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1179418
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1179416
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1179828
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1179826
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1179832&r1=1179831&r2=1179832&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Thu Oct  6 20:30:17 2011
@@ -90,7 +90,7 @@ public class HintedHandOffManager implem
 
     private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>();
 
-    private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HintedHandoff");
+    private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HintedHandoff", Thread.MIN_PRIORITY);
 
     public HintedHandOffManager()
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1179832&r1=1179831&r2=1179832&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Thu Oct  6 20:30:17 2011
@@ -483,10 +483,13 @@ public class CompactionManager implement
         // row header (key or data size) is corrupt. (This means our position in the index file will be one row
         // "ahead" of the data file.)
         final RandomAccessReader dataFile = sstable.openDataReader(true);
-
-        String indexFilename = sstable.descriptor.filenameFor(Component.PRIMARY_INDEX);
-        RandomAccessReader indexFile = RandomAccessReader.open(new File(indexFilename), true);
+        RandomAccessReader indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
         ScrubInfo scrubInfo = new ScrubInfo(dataFile, sstable);
+        executor.beginCompaction(scrubInfo);
+
+        SSTableWriter writer = null;
+        SSTableReader newSstable = null;
+        int goodRows = 0, badRows = 0, emptyRows = 0;
 
         try
         {
@@ -497,170 +500,155 @@ public class CompactionManager implement
                 assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
             }
 
-            SSTableReader newSstable = null;
-
-            // errors when creating the writer may leave empty temp files.
-            SSTableWriter writer = maybeCreateWriter(cfs,
-                                                     compactionFileLocation,
-                                                     expectedBloomFilterSize,
-                                                     null,
-                                                     Collections.singletonList(sstable));
-
-            int goodRows = 0, badRows = 0, emptyRows = 0;
+            // TODO errors when creating the writer may leave empty temp files.
+            writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, null, Collections.singletonList(sstable));
 
-            executor.beginCompaction(scrubInfo);
-
-            try
+            while (!dataFile.isEOF())
             {
-                while (!dataFile.isEOF())
+                long rowStart = dataFile.getFilePointer();
+                if (logger.isDebugEnabled())
+                    logger.debug("Reading row at " + rowStart);
+
+                DecoratedKey key = null;
+                long dataSize = -1;
+                try
                 {
-                    long rowStart = dataFile.getFilePointer();
+                    key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
+                    dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
                     if (logger.isDebugEnabled())
-                        logger.debug("Reading row at " + rowStart);
+                        logger.debug(String.format("row %s is %s bytes", ByteBufferUtil.bytesToHex(key.key), dataSize));
+                }
+                catch (Throwable th)
+                {
+                    throwIfFatal(th);
+                    // check for null key below
+                }
 
-                    DecoratedKey key = null;
-                    long dataSize = -1;
-                    try
-                    {
-                        key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
-                        dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
-                        if (logger.isDebugEnabled())
-                            logger.debug(String.format("row %s is %s bytes", ByteBufferUtil.bytesToHex(key.key), dataSize));
-                    }
-                    catch (Throwable th)
-                    {
-                        throwIfFatal(th);
-                        // check for null key below
-                    }
+                ByteBuffer currentIndexKey = nextIndexKey;
+                long nextRowPositionFromIndex;
+                try
+                {
+                    nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
+                    nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong();
+                }
+                catch (Throwable th)
+                {
+                    logger.warn("Error reading index file", th);
+                    nextIndexKey = null;
+                    nextRowPositionFromIndex = dataFile.length();
+                }
 
-                    ByteBuffer currentIndexKey = nextIndexKey;
-                    long nextRowPositionFromIndex;
-                    try
-                    {
-                        nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
-                        nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong();
-                    }
-                    catch (Throwable th)
-                    {
-                        logger.warn("Error reading index file", th);
-                        nextIndexKey = null;
-                        nextRowPositionFromIndex = dataFile.length();
-                    }
-
-                    long dataStart = dataFile.getFilePointer();
-                    long dataStartFromIndex = currentIndexKey == null
-                                            ? -1
-                                            : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
-                    long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
-                    assert currentIndexKey != null || indexFile.isEOF();
-                    if (logger.isDebugEnabled() && currentIndexKey != null)
-                        logger.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey),  dataSizeFromIndex));
+                long dataStart = dataFile.getFilePointer();
+                long dataStartFromIndex = currentIndexKey == null
+                                        ? -1
+                                        : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
+                long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
+                assert currentIndexKey != null || indexFile.isEOF();
+                if (logger.isDebugEnabled() && currentIndexKey != null)
+                    logger.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey),  dataSizeFromIndex));
 
-                    writer.mark();
-                    try
+                writer.mark();
+                try
+                {
+                    if (key == null)
+                        throw new IOError(new IOException("Unable to read row key from data file"));
+                    if (dataSize > dataFile.length())
+                        throw new IOError(new IOException("Impossible row size " + dataSize));
+                    SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+                    AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
+                    if (compactedRow.isEmpty())
                     {
-                        if (key == null)
-                            throw new IOError(new IOException("Unable to read row key from data file"));
-                        if (dataSize > dataFile.length())
-                            throw new IOError(new IOException("Impossible row size " + dataSize));
-                        SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
-                        AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                        if (compactedRow.isEmpty())
-                        {
-                            emptyRows++;
-                        }
-                        else
-                        {
-                            writer.append(compactedRow);
-                            goodRows++;
-                        }
-                        if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
-                            logger.warn("Index file contained a different key or row size; using key from data file");
+                        emptyRows++;
                     }
-                    catch (Throwable th)
+                    else
                     {
-                        throwIfFatal(th);
-                        logger.warn("Non-fatal error reading row (stacktrace follows)", th);
-                        writer.resetAndTruncate();
-
-                        if (currentIndexKey != null
-                            && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
+                        writer.append(compactedRow);
+                        goodRows++;
+                    }
+                    if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
+                        logger.warn("Index file contained a different key or row size; using key from data file");
+                }
+                catch (Throwable th)
+                {
+                    throwIfFatal(th);
+                    logger.warn("Non-fatal error reading row (stacktrace follows)", th);
+                    writer.resetAndTruncate();
+
+                    if (currentIndexKey != null
+                        && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
+                    {
+                        logger.info(String.format("Retrying from row index; data is %s bytes starting at %s",
+                                                  dataSizeFromIndex, dataStartFromIndex));
+                        key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey);
+                        try
                         {
-                            logger.info(String.format("Retrying from row index; data is %s bytes starting at %s",
-                                                      dataSizeFromIndex, dataStartFromIndex));
-                            key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey);
-                            try
+                            SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
+                            AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
+                            if (compactedRow.isEmpty())
                             {
-                                SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
-                                AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                                if (compactedRow.isEmpty())
-                                {
-                                    emptyRows++;
-                                }
-                                else
-                                {
-                                    writer.append(compactedRow);
-                                    goodRows++;
-                                }
+                                emptyRows++;
                             }
-                            catch (Throwable th2)
+                            else
                             {
-                                throwIfFatal(th2);
-                                // Skipping rows is dangerous for counters (see CASSANDRA-2759)
-                                if (isCommutative)
-                                    throw new IOError(th2);
-
-                                logger.warn("Retry failed too.  Skipping to next row (retry's stacktrace follows)", th2);
-                                writer.resetAndTruncate();
-                                dataFile.seek(nextRowPositionFromIndex);
-                                badRows++;
+                                writer.append(compactedRow);
+                                goodRows++;
                             }
                         }
-                        else
+                        catch (Throwable th2)
                         {
+                            throwIfFatal(th2);
                             // Skipping rows is dangerous for counters (see CASSANDRA-2759)
                             if (isCommutative)
-                                throw new IOError(th);
+                                throw new IOError(th2);
 
-                            logger.warn("Row at " + dataStart + " is unreadable; skipping to next");
-                            if (currentIndexKey != null)
-                                dataFile.seek(nextRowPositionFromIndex);
+                            logger.warn("Retry failed too.  Skipping to next row (retry's stacktrace follows)", th2);
+                            writer.resetAndTruncate();
+                            dataFile.seek(nextRowPositionFromIndex);
                             badRows++;
                         }
                     }
+                    else
+                    {
+                        // Skipping rows is dangerous for counters (see CASSANDRA-2759)
+                        if (isCommutative)
+                            throw new IOError(th);
+
+                        logger.warn("Row at " + dataStart + " is unreadable; skipping to next");
+                        if (currentIndexKey != null)
+                            dataFile.seek(nextRowPositionFromIndex);
+                        badRows++;
+                    }
                 }
-
-                if (writer.getFilePointer() > 0)
-                    newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
-            }
-            finally
-            {
-                writer.cleanupIfNecessary();
             }
 
-            if (newSstable != null)
-            {
-                cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable));
-                logger.info("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
-                if (badRows > 0)
-                    logger.warn("Unable to recover " + badRows + " rows that were skipped.  You can attempt manual recovery from the pre-scrub snapshot.  You can also run nodetool repair to transfer the data from a healthy replica, if any");
-            }
-            else
-            {
-                cfs.markCompacted(Arrays.asList(sstable));
-                if (badRows > 0)
-                    logger.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
-                else
-                    logger.info("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned");
-            }
+            if (writer.getFilePointer() > 0)
+                newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
         }
         finally
         {
+            if (writer != null)
+                writer.cleanupIfNecessary();
             FileUtils.closeQuietly(dataFile);
             FileUtils.closeQuietly(indexFile);
 
             executor.finishCompaction(scrubInfo);
         }
+
+        if (newSstable == null)
+        {
+            cfs.markCompacted(Arrays.asList(sstable));
+            if (badRows > 0)
+                logger.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
+            else
+                logger.info("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned");
+        }
+        else
+        {
+            cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable));
+            logger.info("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
+            if (badRows > 0)
+                logger.warn("Unable to recover " + badRows + " rows that were skipped.  You can attempt manual recovery from the pre-scrub snapshot.  You can also run nodetool repair to transfer the data from a healthy replica, if any");
+        }
     }
 
     private void throwIfFatal(Throwable th)
@@ -981,7 +969,7 @@ public class CompactionManager implement
 
         protected CompactionExecutor(int minThreads, int maxThreads, String name, BlockingQueue<Runnable> queue)
         {
-            super(minThreads, maxThreads, 60, TimeUnit.SECONDS, queue, new NamedThreadFactory(name));
+            super(minThreads, maxThreads, 60, TimeUnit.SECONDS, queue, new NamedThreadFactory(name, Thread.MIN_PRIORITY));
         }
 
         private CompactionExecutor(int threadCount, String name)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1179832&r1=1179831&r2=1179832&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Thu Oct  6 20:30:17 2011
@@ -169,7 +169,7 @@ public class CompactionTask extends Abst
                 {
                     for (SSTableReader sstable : toCompact)
                     {
-                        if (sstable.getCachedPosition(row.key) != null)
+                        if (sstable.getCachedPosition(row.key, false) != null)
                         {
                             cachedKeys.put(row.key, position);
                             break;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java?rev=1179832&r1=1179831&r2=1179832&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java Thu Oct  6 20:30:17 2011
@@ -135,7 +135,7 @@ public class LeveledCompactionStrategy e
 
     public int getEstimatedRemainingTasks()
     {
-        return 0;
+        return manifest.getEstimatedTasks();
     }
 
     public void handleNotification(INotification notification, Object sender)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java?rev=1179832&r1=1179831&r2=1179832&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java Thu Oct  6 20:30:17 2011
@@ -418,4 +418,17 @@ public class LeveledManifest
     {
         return generations[i];
     }
+
+    public int getEstimatedTasks()
+    {
+        int n = 0;
+        for (int i = generations.length - 1; i >= 0; i--)
+        {
+            List<SSTableReader> sstables = generations[i];
+            if (sstables.isEmpty())
+                continue;
+            n += (SSTableReader.getTotalBytes(sstables) - maxBytesForLevel(i)) / maxSSTableSizeInMB;
+        }
+        return n;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1179832&r1=1179831&r2=1179832&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Thu Oct  6 20:30:17 2011
@@ -572,15 +572,15 @@ public class SSTableReader extends SSTab
         keyCache.put(new Pair<Descriptor, DecoratedKey>(descriptor, copiedKey), info);
     }
 
-    public Long getCachedPosition(DecoratedKey key)
+    public Long getCachedPosition(DecoratedKey key, boolean updateStats)
     {
-        return getCachedPosition(new Pair<Descriptor, DecoratedKey>(descriptor, key));
+        return getCachedPosition(new Pair<Descriptor, DecoratedKey>(descriptor, key), updateStats);
     }
 
-    private Long getCachedPosition(Pair<Descriptor, DecoratedKey> unifiedKey)
+    private Long getCachedPosition(Pair<Descriptor, DecoratedKey> unifiedKey, boolean updateStats)
     {
         if (keyCache != null && keyCache.getCapacity() > 0)
-            return keyCache.get(unifiedKey);
+            return updateStats ? keyCache.get(unifiedKey) : keyCache.getInternal(unifiedKey);
         return null;
     }
 
@@ -603,7 +603,7 @@ public class SSTableReader extends SSTab
         if (op == Operator.EQ || op == Operator.GE)
         {
             Pair<Descriptor, DecoratedKey> unifiedKey = new Pair<Descriptor, DecoratedKey>(descriptor, decoratedKey);
-            Long cachedPosition = getCachedPosition(unifiedKey);
+            Long cachedPosition = getCachedPosition(unifiedKey, true);
             if (cachedPosition != null)
                 return cachedPosition;
         }