You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/10/06 22:30:18 UTC
svn commit: r1179832 - in /cassandra/trunk: ./ contrib/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/db/compaction/
src/java/org/apache/cassandra/io/sstable/
Author: jbellis
Date: Thu Oct 6 20:30:17 2011
New Revision: 1179832
URL: http://svn.apache.org/viewvc?rev=1179832&view=rev
Log:
merge from 1.0
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 6 20:30:17 2011
@@ -4,8 +4,8 @@
/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1178554,1178785,1178957,1179359,1179364
/cassandra/branches/cassandra-0.8.0:1125021-1130369
/cassandra/branches/cassandra-0.8.1:1101014-1125018
-/cassandra/branches/cassandra-1.0:1167085-1179418
-/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1179416
+/cassandra/branches/cassandra-1.0:1167085-1179828
+/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1179826
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1179832&r1=1179831&r2=1179832&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Oct 6 20:30:17 2011
@@ -6,6 +6,7 @@
* off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271)
1.0.0-final
+ * close scrubbed sstable fd before deleting it (CASSANDRA-3318)
* fix bug preventing obsolete commitlog segments from being removed
(CASSANDRA-3269)
* tolerate whitespace in seed CDL (CASSANDRA-3263)
@@ -13,6 +14,9 @@
(CASSANDRA-3295)
* Fix broken CompressedRandomAccessReaderTest (CASSANDRA-3298)
* (CQL) fix type information returned for wildcard queries (CASSANDRA-3311)
+ * add estimated tasks to LeveledCompactionStrategy (CASSANDRA-3322)
+ * avoid including compaction cache-warming in keycache stats (CASSANDRA-3325)
+ * run compaction and hinted handoff threads at MIN_PRIORITY (CASSANDRA-3308)
Fixes merged from 0.8 below:
* Fix tool .bat files when CASSANDRA_HOME contains spaces (CASSANDRA-3258)
* Force flush of status table when removing/updating token (CASSANDRA-3243)
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 6 20:30:17 2011
@@ -4,8 +4,8 @@
/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1178554,1178785,1178957,1179359,1179364
/cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
/cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
-/cassandra/branches/cassandra-1.0/contrib:1167085-1179418
-/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1179416
+/cassandra/branches/cassandra-1.0/contrib:1167085-1179828
+/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1179826
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020
/incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 6 20:30:17 2011
@@ -4,8 +4,8 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1178554,1178785,1178957,1179359,1179364
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1179418
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1179416
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1179828
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1179826
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 6 20:30:17 2011
@@ -4,8 +4,8 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1178554,1178785,1178957,1179359,1179364
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1179418
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1179416
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1179828
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1179826
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 6 20:30:17 2011
@@ -4,8 +4,8 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1178554,1178785,1178957,1179359,1179364
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1179418
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1179416
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1179828
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1179826
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 6 20:30:17 2011
@@ -4,8 +4,8 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1178554,1178785,1178957,1179359,1179364
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1179418
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1179416
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1179828
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1179826
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 6 20:30:17 2011
@@ -4,8 +4,8 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1178554,1178785,1178957,1179359,1179364
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1179418
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1179416
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1179828
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1179826
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1179832&r1=1179831&r2=1179832&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Thu Oct 6 20:30:17 2011
@@ -90,7 +90,7 @@ public class HintedHandOffManager implem
private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>();
- private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HintedHandoff");
+ private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HintedHandoff", Thread.MIN_PRIORITY);
public HintedHandOffManager()
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1179832&r1=1179831&r2=1179832&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Thu Oct 6 20:30:17 2011
@@ -483,10 +483,13 @@ public class CompactionManager implement
// row header (key or data size) is corrupt. (This means our position in the index file will be one row
// "ahead" of the data file.)
final RandomAccessReader dataFile = sstable.openDataReader(true);
-
- String indexFilename = sstable.descriptor.filenameFor(Component.PRIMARY_INDEX);
- RandomAccessReader indexFile = RandomAccessReader.open(new File(indexFilename), true);
+ RandomAccessReader indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
ScrubInfo scrubInfo = new ScrubInfo(dataFile, sstable);
+ executor.beginCompaction(scrubInfo);
+
+ SSTableWriter writer = null;
+ SSTableReader newSstable = null;
+ int goodRows = 0, badRows = 0, emptyRows = 0;
try
{
@@ -497,170 +500,155 @@ public class CompactionManager implement
assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
}
- SSTableReader newSstable = null;
-
- // errors when creating the writer may leave empty temp files.
- SSTableWriter writer = maybeCreateWriter(cfs,
- compactionFileLocation,
- expectedBloomFilterSize,
- null,
- Collections.singletonList(sstable));
-
- int goodRows = 0, badRows = 0, emptyRows = 0;
+ // TODO errors when creating the writer may leave empty temp files.
+ writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, null, Collections.singletonList(sstable));
- executor.beginCompaction(scrubInfo);
-
- try
+ while (!dataFile.isEOF())
{
- while (!dataFile.isEOF())
+ long rowStart = dataFile.getFilePointer();
+ if (logger.isDebugEnabled())
+ logger.debug("Reading row at " + rowStart);
+
+ DecoratedKey key = null;
+ long dataSize = -1;
+ try
{
- long rowStart = dataFile.getFilePointer();
+ key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
+ dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
if (logger.isDebugEnabled())
- logger.debug("Reading row at " + rowStart);
+ logger.debug(String.format("row %s is %s bytes", ByteBufferUtil.bytesToHex(key.key), dataSize));
+ }
+ catch (Throwable th)
+ {
+ throwIfFatal(th);
+ // check for null key below
+ }
- DecoratedKey key = null;
- long dataSize = -1;
- try
- {
- key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
- dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
- if (logger.isDebugEnabled())
- logger.debug(String.format("row %s is %s bytes", ByteBufferUtil.bytesToHex(key.key), dataSize));
- }
- catch (Throwable th)
- {
- throwIfFatal(th);
- // check for null key below
- }
+ ByteBuffer currentIndexKey = nextIndexKey;
+ long nextRowPositionFromIndex;
+ try
+ {
+ nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
+ nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong();
+ }
+ catch (Throwable th)
+ {
+ logger.warn("Error reading index file", th);
+ nextIndexKey = null;
+ nextRowPositionFromIndex = dataFile.length();
+ }
- ByteBuffer currentIndexKey = nextIndexKey;
- long nextRowPositionFromIndex;
- try
- {
- nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
- nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong();
- }
- catch (Throwable th)
- {
- logger.warn("Error reading index file", th);
- nextIndexKey = null;
- nextRowPositionFromIndex = dataFile.length();
- }
-
- long dataStart = dataFile.getFilePointer();
- long dataStartFromIndex = currentIndexKey == null
- ? -1
- : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
- long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
- assert currentIndexKey != null || indexFile.isEOF();
- if (logger.isDebugEnabled() && currentIndexKey != null)
- logger.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey), dataSizeFromIndex));
+ long dataStart = dataFile.getFilePointer();
+ long dataStartFromIndex = currentIndexKey == null
+ ? -1
+ : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
+ long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
+ assert currentIndexKey != null || indexFile.isEOF();
+ if (logger.isDebugEnabled() && currentIndexKey != null)
+ logger.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey), dataSizeFromIndex));
- writer.mark();
- try
+ writer.mark();
+ try
+ {
+ if (key == null)
+ throw new IOError(new IOException("Unable to read row key from data file"));
+ if (dataSize > dataFile.length())
+ throw new IOError(new IOException("Impossible row size " + dataSize));
+ SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+ AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
+ if (compactedRow.isEmpty())
{
- if (key == null)
- throw new IOError(new IOException("Unable to read row key from data file"));
- if (dataSize > dataFile.length())
- throw new IOError(new IOException("Impossible row size " + dataSize));
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
- AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (compactedRow.isEmpty())
- {
- emptyRows++;
- }
- else
- {
- writer.append(compactedRow);
- goodRows++;
- }
- if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
- logger.warn("Index file contained a different key or row size; using key from data file");
+ emptyRows++;
}
- catch (Throwable th)
+ else
{
- throwIfFatal(th);
- logger.warn("Non-fatal error reading row (stacktrace follows)", th);
- writer.resetAndTruncate();
-
- if (currentIndexKey != null
- && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
+ writer.append(compactedRow);
+ goodRows++;
+ }
+ if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
+ logger.warn("Index file contained a different key or row size; using key from data file");
+ }
+ catch (Throwable th)
+ {
+ throwIfFatal(th);
+ logger.warn("Non-fatal error reading row (stacktrace follows)", th);
+ writer.resetAndTruncate();
+
+ if (currentIndexKey != null
+ && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
+ {
+ logger.info(String.format("Retrying from row index; data is %s bytes starting at %s",
+ dataSizeFromIndex, dataStartFromIndex));
+ key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey);
+ try
{
- logger.info(String.format("Retrying from row index; data is %s bytes starting at %s",
- dataSizeFromIndex, dataStartFromIndex));
- key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey);
- try
+ SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
+ AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
+ if (compactedRow.isEmpty())
{
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
- AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (compactedRow.isEmpty())
- {
- emptyRows++;
- }
- else
- {
- writer.append(compactedRow);
- goodRows++;
- }
+ emptyRows++;
}
- catch (Throwable th2)
+ else
{
- throwIfFatal(th2);
- // Skipping rows is dangerous for counters (see CASSANDRA-2759)
- if (isCommutative)
- throw new IOError(th2);
-
- logger.warn("Retry failed too. Skipping to next row (retry's stacktrace follows)", th2);
- writer.resetAndTruncate();
- dataFile.seek(nextRowPositionFromIndex);
- badRows++;
+ writer.append(compactedRow);
+ goodRows++;
}
}
- else
+ catch (Throwable th2)
{
+ throwIfFatal(th2);
// Skipping rows is dangerous for counters (see CASSANDRA-2759)
if (isCommutative)
- throw new IOError(th);
+ throw new IOError(th2);
- logger.warn("Row at " + dataStart + " is unreadable; skipping to next");
- if (currentIndexKey != null)
- dataFile.seek(nextRowPositionFromIndex);
+ logger.warn("Retry failed too. Skipping to next row (retry's stacktrace follows)", th2);
+ writer.resetAndTruncate();
+ dataFile.seek(nextRowPositionFromIndex);
badRows++;
}
}
+ else
+ {
+ // Skipping rows is dangerous for counters (see CASSANDRA-2759)
+ if (isCommutative)
+ throw new IOError(th);
+
+ logger.warn("Row at " + dataStart + " is unreadable; skipping to next");
+ if (currentIndexKey != null)
+ dataFile.seek(nextRowPositionFromIndex);
+ badRows++;
+ }
}
-
- if (writer.getFilePointer() > 0)
- newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
- }
- finally
- {
- writer.cleanupIfNecessary();
}
- if (newSstable != null)
- {
- cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable));
- logger.info("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
- if (badRows > 0)
- logger.warn("Unable to recover " + badRows + " rows that were skipped. You can attempt manual recovery from the pre-scrub snapshot. You can also run nodetool repair to transfer the data from a healthy replica, if any");
- }
- else
- {
- cfs.markCompacted(Arrays.asList(sstable));
- if (badRows > 0)
- logger.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
- else
- logger.info("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned");
- }
+ if (writer.getFilePointer() > 0)
+ newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
}
finally
{
+ if (writer != null)
+ writer.cleanupIfNecessary();
FileUtils.closeQuietly(dataFile);
FileUtils.closeQuietly(indexFile);
executor.finishCompaction(scrubInfo);
}
+
+ if (newSstable == null)
+ {
+ cfs.markCompacted(Arrays.asList(sstable));
+ if (badRows > 0)
+ logger.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
+ else
+ logger.info("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned");
+ }
+ else
+ {
+ cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable));
+ logger.info("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
+ if (badRows > 0)
+ logger.warn("Unable to recover " + badRows + " rows that were skipped. You can attempt manual recovery from the pre-scrub snapshot. You can also run nodetool repair to transfer the data from a healthy replica, if any");
+ }
}
private void throwIfFatal(Throwable th)
@@ -981,7 +969,7 @@ public class CompactionManager implement
protected CompactionExecutor(int minThreads, int maxThreads, String name, BlockingQueue<Runnable> queue)
{
- super(minThreads, maxThreads, 60, TimeUnit.SECONDS, queue, new NamedThreadFactory(name));
+ super(minThreads, maxThreads, 60, TimeUnit.SECONDS, queue, new NamedThreadFactory(name, Thread.MIN_PRIORITY));
}
private CompactionExecutor(int threadCount, String name)
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1179832&r1=1179831&r2=1179832&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Thu Oct 6 20:30:17 2011
@@ -169,7 +169,7 @@ public class CompactionTask extends Abst
{
for (SSTableReader sstable : toCompact)
{
- if (sstable.getCachedPosition(row.key) != null)
+ if (sstable.getCachedPosition(row.key, false) != null)
{
cachedKeys.put(row.key, position);
break;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java?rev=1179832&r1=1179831&r2=1179832&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java Thu Oct 6 20:30:17 2011
@@ -135,7 +135,7 @@ public class LeveledCompactionStrategy e
public int getEstimatedRemainingTasks()
{
- return 0;
+ return manifest.getEstimatedTasks();
}
public void handleNotification(INotification notification, Object sender)
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java?rev=1179832&r1=1179831&r2=1179832&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java Thu Oct 6 20:30:17 2011
@@ -418,4 +418,17 @@ public class LeveledManifest
{
return generations[i];
}
+
+ public int getEstimatedTasks()
+ {
+ int n = 0;
+ for (int i = generations.length - 1; i >= 0; i--)
+ {
+ List<SSTableReader> sstables = generations[i];
+ if (sstables.isEmpty())
+ continue;
+ n += (SSTableReader.getTotalBytes(sstables) - maxBytesForLevel(i)) / maxSSTableSizeInMB;
+ }
+ return n;
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1179832&r1=1179831&r2=1179832&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Thu Oct 6 20:30:17 2011
@@ -572,15 +572,15 @@ public class SSTableReader extends SSTab
keyCache.put(new Pair<Descriptor, DecoratedKey>(descriptor, copiedKey), info);
}
- public Long getCachedPosition(DecoratedKey key)
+ public Long getCachedPosition(DecoratedKey key, boolean updateStats)
{
- return getCachedPosition(new Pair<Descriptor, DecoratedKey>(descriptor, key));
+ return getCachedPosition(new Pair<Descriptor, DecoratedKey>(descriptor, key), updateStats);
}
- private Long getCachedPosition(Pair<Descriptor, DecoratedKey> unifiedKey)
+ private Long getCachedPosition(Pair<Descriptor, DecoratedKey> unifiedKey, boolean updateStats)
{
if (keyCache != null && keyCache.getCapacity() > 0)
- return keyCache.get(unifiedKey);
+ return updateStats ? keyCache.get(unifiedKey) : keyCache.getInternal(unifiedKey);
return null;
}
@@ -603,7 +603,7 @@ public class SSTableReader extends SSTab
if (op == Operator.EQ || op == Operator.GE)
{
Pair<Descriptor, DecoratedKey> unifiedKey = new Pair<Descriptor, DecoratedKey>(descriptor, decoratedKey);
- Long cachedPosition = getCachedPosition(unifiedKey);
+ Long cachedPosition = getCachedPosition(unifiedKey, true);
if (cachedPosition != null)
return cachedPosition;
}