You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/02/23 04:44:26 UTC
svn commit: r1073586 - in /cassandra/branches/cassandra-0.7: ./
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/
src/java/org/apache/cassandra/io/sstable/
src/java/org/apache/cassandra/io/util/
src/java/org/apache/cassandra/service/ ...
Author: jbellis
Date: Wed Feb 23 03:44:26 2011
New Revision: 1073586
URL: http://svn.apache.org/viewvc?rev=1073586&view=rev
Log:
nodetool scrub
patch by jbellis and tjake; reviewed by slebresne for CASSANDRA-2217
Added:
cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/
cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Data.db (with props)
cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Filter.db (with props)
cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Index.db (with props)
cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Statistics.db (with props)
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ScrubTest.java
Modified:
cassandra/branches/cassandra-0.7/CHANGES.txt
cassandra/branches/cassandra-0.7/build.xml
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/PrecompactedRow.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/CLibrary.java
cassandra/branches/cassandra-0.7/test/conf/cassandra.yaml
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Feb 23 03:44:26 2011
@@ -15,6 +15,7 @@
* update memtable_throughput to be a long (CASSANDRA-2158)
* fix for compaction and cleanup writing old-format data into new-version
sstable (CASSANDRA-2211, -2216)
+ * add nodetool scrub (CASSANDRA-2217)
* fix sstable2json large-row pagination (CASSANDRA-2188)
* fix EOFing on requests for the last bytes in a file (CASSANDRA-2213)
* fix BRAF performance when seeking to EOF (CASSANDRA-2218)
Modified: cassandra/branches/cassandra-0.7/build.xml
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/build.xml?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/build.xml (original)
+++ cassandra/branches/cassandra-0.7/build.xml Wed Feb 23 03:44:26 2011
@@ -561,6 +561,7 @@
<target name="test" depends="build-test" description="Execute unit tests">
<testmacro suitename="unit" inputdir="${test.unit.src}" timeout="60000">
<jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
+ <jvmarg value="-Dcorrupt-sstable-root=${test.data}/corrupt-sstables"/>
</testmacro>
</target>
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Feb 23 03:44:26 2011
@@ -928,6 +928,12 @@ public class ColumnFamilyStore implement
CompactionManager.instance.performCleanup(ColumnFamilyStore.this);
}
+ public void scrub() throws ExecutionException, InterruptedException
+ {
+ snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis());
+ CompactionManager.instance.performScrub(ColumnFamilyStore.this);
+ }
+
void markCompacted(Collection<SSTableReader> sstables)
{
ssTables.markCompacted(sstables);
@@ -1601,26 +1607,8 @@ public class ColumnFamilyStore implement
return metadata.comparator;
}
- /**
- * Take a snap shot of this columnfamily store.
- *
- * @param snapshotName the name of the associated with the snapshot
- */
- public void snapshot(String snapshotName)
+ private void snapshotWithoutFlush(String snapshotName)
{
- try
- {
- forceBlockingFlush();
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
-
for (SSTableReader ssTable : ssTables)
{
try
@@ -1643,6 +1631,30 @@ public class ColumnFamilyStore implement
}
}
+
+ /**
+ * Take a snap shot of this columnfamily store.
+ *
+ * @param snapshotName the name of the associated with the snapshot
+ */
+ public void snapshot(String snapshotName)
+ {
+ try
+ {
+ forceBlockingFlush();
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ snapshotWithoutFlush(snapshotName);
+ }
+
public boolean hasUnreclaimedSpace()
{
return ssTables.getLiveSize() < ssTables.getTotalSize();
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java Wed Feb 23 03:44:26 2011
@@ -43,9 +43,11 @@ import org.apache.cassandra.config.Datab
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.*;
import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.service.AntiEntropyService;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -179,6 +181,28 @@ public class CompactionManager implement
executor.submit(runnable).get();
}
+ public void performScrub(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
+ {
+ Callable<Object> runnable = new Callable<Object>()
+ {
+ public Object call() throws IOException
+ {
+ compactionLock.lock();
+ try
+ {
+ if (!cfStore.isInvalid())
+ doScrub(cfStore);
+ return this;
+ }
+ finally
+ {
+ compactionLock.unlock();
+ }
+ }
+ };
+ executor.submit(runnable).get();
+ }
+
public void performMajor(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
{
submitMajor(cfStore, 0, getDefaultGcBefore(cfStore)).get();
@@ -473,6 +497,101 @@ public class CompactionManager implement
}
/**
+ * Deserialize everything in the CFS and re-serialize w/ the newest version. Also attempts to recover
+ * from bogus row keys / sizes using data from the index, and skips rows with garbage columns that resulted
+ * from early ByteBuffer bugs.
+ *
+ * @throws IOException
+ */
+ private void doScrub(ColumnFamilyStore cfs) throws IOException
+ {
+ assert !cfs.isIndex();
+ Table table = cfs.table;
+ Collection<Range> ranges = StorageService.instance.getLocalRanges(table.name);
+
+ for (final SSTableReader sstable : cfs.getSSTables())
+ {
+ logger.info("Scrubbing " + sstable);
+
+ // Calculate the expected compacted filesize
+ String compactionFileLocation = table.getDataFileLocation(sstable.length());
+ if (compactionFileLocation == null)
+ throw new IOException("disk full");
+
+ int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(),
+ (int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
+ if (logger.isDebugEnabled())
+ logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+
+ // loop through each row, deserializing to check for damage.
+ // we'll also loop through the index at the same time, using the position from the index to recover if the
+ // row header (key or data size) is corrupt. (This means our position in the index file will be one row
+ // "ahead" of the data file.)
+ final BufferedRandomAccessFile dataFile = BufferedRandomAccessFile.getUncachingReader(sstable.getFilename());
+ String indexFilename = sstable.descriptor.filenameFor(Component.PRIMARY_INDEX);
+ BufferedRandomAccessFile indexFile = BufferedRandomAccessFile.getUncachingReader(indexFilename);
+ ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
+ assert indexFile.readLong() == 0;
+
+ SSTableWriter writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, null);
+ executor.beginCompaction(cfs.columnFamily, new ScrubInfo(dataFile, sstable));
+
+ while (!dataFile.isEOF())
+ {
+ long rowStart = dataFile.getFilePointer();
+ DecoratedKey key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
+ ByteBuffer currentIndexKey = nextIndexKey;
+ nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
+ long nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong();
+
+ long dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
+ long dataStart = dataFile.getFilePointer();
+
+ SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+ writer.mark();
+ try
+ {
+ writer.append(getCompactedRow(row, cfs, sstable.descriptor, true));
+ }
+ catch (Exception e)
+ {
+ logger.warn("Error reading row " + ByteBufferUtil.bytesToHex(key.key) + "(stacktrace follows)", e);
+ writer.reset();
+
+ long dataStartFromIndex = rowStart + 2 + currentIndexKey.remaining();
+ if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
+ {
+ logger.info(String.format("Retrying %s as key %s from row index",
+ ByteBufferUtil.bytesToHex(key.key), ByteBufferUtil.bytesToHex(currentIndexKey)));
+ key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey);
+ long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
+ row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
+ try
+ {
+ writer.append(getCompactedRow(row, cfs, sstable.descriptor, true));
+ }
+ catch (Exception e2)
+ {
+ logger.info("Retry failed too. Skipping to next row (retry's stacktrace follows)", e2);
+ writer.reset();
+ dataFile.seek(nextRowPositionFromIndex);
+ }
+ }
+ else
+ {
+ logger.info("Skipping to next row");
+ dataFile.seek(nextRowPositionFromIndex);
+ }
+ }
+ }
+
+ SSTableReader newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
+ cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable));
+ logger.info("Scrub of " + sstable + " complete");
+ }
+ }
+
+ /**
* This function goes over each file and removes the keys that the node is not responsible for
* and only keeps keys that this node is responsible for.
*
@@ -497,7 +616,7 @@ public class CompactionManager implement
long totalkeysWritten = 0;
int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(),
- (int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable)) / 2));
+ (int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
@@ -513,7 +632,7 @@ public class CompactionManager implement
if (Range.isTokenInRanges(row.getKey().token, ranges))
{
writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer);
- writer.append(getCompactedRow(row, cfs, sstable.descriptor));
+ writer.append(getCompactedRow(row, cfs, sstable.descriptor, true));
totalkeysWritten++;
}
else
@@ -571,14 +690,14 @@ public class CompactionManager implement
* If the data is from a current-version sstable, write it unchanged. Otherwise,
* re-serialize it in the latest version.
*/
- private AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row, ColumnFamilyStore cfs, Descriptor descriptor)
+ private AbstractCompactedRow getCompactedRow(SSTableIdentityIterator row, ColumnFamilyStore cfs, Descriptor descriptor, boolean forceDeserialize)
{
- if (descriptor.isLatestVersion)
+ if (descriptor.isLatestVersion && !forceDeserialize)
return new EchoedRow(row);
return row.dataSize > DatabaseDescriptor.getInMemoryCompactionLimit()
- ? new LazilyCompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs))
- : new PrecompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs));
+ ? new LazilyCompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs), forceDeserialize)
+ : new PrecompactedRow(cfs, Arrays.asList(row), false, getDefaultGcBefore(cfs), forceDeserialize);
}
private SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, String compactionFileLocation, int expectedBloomFilterSize, SSTableWriter writer)
@@ -983,4 +1102,38 @@ public class CompactionManager implement
return "Cleanup of " + sstable.getColumnFamilyName();
}
}
+
+ private static class ScrubInfo implements ICompactionInfo
+ {
+ private final BufferedRandomAccessFile dataFile;
+ private final SSTableReader sstable;
+
+ public ScrubInfo(BufferedRandomAccessFile dataFile, SSTableReader sstable)
+ {
+ this.dataFile = dataFile;
+ this.sstable = sstable;
+ }
+
+ public long getTotalBytes()
+ {
+ try
+ {
+ return dataFile.length();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public long getBytesComplete()
+ {
+ return dataFile.getFilePointer();
+ }
+
+ public String getTaskType()
+ {
+ return "Scrub " + sstable;
+ }
+ }
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java Wed Feb 23 03:44:26 2011
@@ -134,9 +134,9 @@ implements Closeable, ICompactionInfo
{
logger.info(String.format("Compacting large row %s (%d bytes) incrementally",
ByteBufferUtil.bytesToHex(rows.get(0).getKey().key), rowSize));
- return new LazilyCompactedRow(cfs, rows, major, gcBefore);
+ return new LazilyCompactedRow(cfs, rows, major, gcBefore, false);
}
- return new PrecompactedRow(cfs, rows, major, gcBefore);
+ return new PrecompactedRow(cfs, rows, major, gcBefore, false);
}
public void close() throws IOException
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/LazilyCompactedRow.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/LazilyCompactedRow.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/LazilyCompactedRow.java Wed Feb 23 03:44:26 2011
@@ -59,15 +59,17 @@ public class LazilyCompactedRow extends
private final boolean shouldPurge;
private final int gcBefore;
private final DataOutputBuffer headerBuffer;
+ private final boolean forceDeserialize;
private ColumnFamily emptyColumnFamily;
private LazyColumnIterator iter;
private int columnCount;
private long columnSerializedSize;
- public LazilyCompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator> rows, boolean major, int gcBefore)
+ public LazilyCompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator> rows, boolean major, int gcBefore, boolean forceDeserialize)
{
super(rows.get(0).getKey());
this.gcBefore = gcBefore;
+ this.forceDeserialize = forceDeserialize;
this.rows = new ArrayList<SSTableIdentityIterator>(rows);
Set<SSTable> sstables = new HashSet<SSTable>();
@@ -94,7 +96,7 @@ public class LazilyCompactedRow extends
public void write(DataOutput out) throws IOException
{
- if (rows.size() == 1 && !shouldPurge && rows.get(0).sstable.descriptor.isLatestVersion)
+ if (rows.size() == 1 && !shouldPurge && rows.get(0).sstable.descriptor.isLatestVersion && !forceDeserialize)
{
SSTableIdentityIterator row = rows.get(0);
out.writeLong(row.dataSize);
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/PrecompactedRow.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/PrecompactedRow.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/PrecompactedRow.java Wed Feb 23 03:44:26 2011
@@ -55,7 +55,7 @@ public class PrecompactedRow extends Abs
this.buffer = buffer;
}
- public PrecompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator> rows, boolean major, int gcBefore)
+ public PrecompactedRow(ColumnFamilyStore cfStore, List<SSTableIdentityIterator> rows, boolean major, int gcBefore, boolean forceDeserialize)
{
super(rows.get(0).getKey());
buffer = new DataOutputBuffer();
@@ -67,7 +67,7 @@ public class PrecompactedRow extends Abs
}
boolean shouldPurge = major || !cfStore.isKeyInRemainingSSTables(key, sstables);
- if (rows.size() > 1 || shouldPurge || !rows.get(0).sstable.descriptor.isLatestVersion)
+ if (rows.size() > 1 || shouldPurge || !rows.get(0).sstable.descriptor.isLatestVersion || forceDeserialize)
{
ColumnFamily cf = null;
for (SSTableIdentityIterator row : rows)
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java Wed Feb 23 03:44:26 2011
@@ -24,15 +24,22 @@ package org.apache.cassandra.io.sstable;
import java.io.DataOutput;
import java.io.IOError;
import java.io.IOException;
+import java.util.ArrayList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.utils.Filter;
public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, IColumnIterator
{
+ private static final Logger logger = LoggerFactory.getLogger(SSTableIdentityIterator.class);
+
private final DecoratedKey key;
private final long finishedAt;
private final BufferedRandomAccessFile file;
@@ -56,6 +63,12 @@ public class SSTableIdentityIterator imp
public SSTableIdentityIterator(SSTableReader sstable, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize)
throws IOException
{
+ this(sstable, file, key, dataStart, dataSize, false);
+ }
+
+ public SSTableIdentityIterator(SSTableReader sstable, BufferedRandomAccessFile file, DecoratedKey key, long dataStart, long dataSize, boolean deserializeRowHeader)
+ throws IOException
+ {
this.sstable = sstable;
this.file = file;
this.key = key;
@@ -66,6 +79,28 @@ public class SSTableIdentityIterator imp
try
{
file.seek(this.dataStart);
+ if (deserializeRowHeader)
+ {
+ try
+ {
+ IndexHelper.defreezeBloomFilter(file, sstable.descriptor.usesOldBloomFilter);
+ }
+ catch (Exception e)
+ {
+ logger.info("Invalid bloom filter in " + sstable + "; will rebuild it");
+ // deFreeze should have left the file position ready to deserialize index
+ }
+ try
+ {
+ IndexHelper.deserializeIndex(file);
+ }
+ catch (Exception e)
+ {
+ logger.info("Invalid row summary in " + sstable + "; will rebuild it");
+ }
+ file.seek(this.dataStart);
+ }
+
IndexHelper.skipBloomFilter(file);
IndexHelper.skipIndex(file);
columnFamily = sstable.createColumnFamily();
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java Wed Feb 23 03:44:26 2011
@@ -53,10 +53,9 @@ public class BufferedRandomAccessFile ex
// buffer which will cache file blocks
private ByteBuffer buffer;
- // `current` is the current user-visible position in in the file, i.e., corresponding to getFilePointer() or seek()
- // `bufferOffset` is the position in the file of the beginning of the buffer
+ // `current` as current position in file
+ // `bufferOffset` is the offset of the beginning of the buffer
// `bufferEnd` is `bufferOffset` + count of bytes read from file, i.e. the lowest position we can't read from the buffer
- // (NOT the same as bufferOffset + buffer.length since buffer may not be completely full)
private long bufferOffset, bufferEnd, current = 0;
// max buffer size is set according to (int size) parameter in the
@@ -202,6 +201,7 @@ public class BufferedRandomAccessFile ex
buffer.rewind();
bufferEnd = bufferOffset;
hitEOF = true;
+
return 0;
}
@@ -466,6 +466,11 @@ public class BufferedRandomAccessFile ex
return (int) bytes;
}
+ public static BufferedRandomAccessFile getUncachingReader(String filename) throws IOException
+ {
+ return new BufferedRandomAccessFile(new File(filename), "r", 8 * 1024 * 1024, true);
+ }
+
/**
* Class to hold a mark to the position of the file
*/
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java Wed Feb 23 03:44:26 2011
@@ -1234,13 +1234,19 @@ public class StorageService implements I
{
if (tableName.equals("system"))
throw new RuntimeException("Cleanup of the system table is neither necessary nor wise");
-
+
for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
{
cfStore.forceCleanup();
}
}
+ public void scrub(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ {
+ for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
+ cfStore.scrub();
+ }
+
public void forceTableCompaction(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java Wed Feb 23 03:44:26 2011
@@ -158,6 +158,14 @@ public interface StorageServiceMBean
public void forceTableCleanup(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
/**
+ * Scrub (deserialize + reserialize at the latest version, skipping bad rows if any) the given keyspace.
+ * If columnFamilies array is empty, all CFs are scrubbed.
+ *
+ * Scrubbed CFs will be snapshotted first.
+ */
+ public void scrub(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
+
+ /**
* Flush all memtables for the given column families, or all columnfamilies for the given table
* if none are explicitly listed.
* @param tableName
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java Wed Feb 23 03:44:26 2011
@@ -72,7 +72,7 @@ public class NodeCmd {
public enum NodeCommand {
RING, INFO, CFSTATS, SNAPSHOT, CLEARSNAPSHOT, VERSION, TPSTATS, FLUSH, DRAIN,
- DECOMMISSION, MOVE, LOADBALANCE, REMOVETOKEN, REPAIR, CLEANUP, COMPACT,
+ DECOMMISSION, MOVE, LOADBALANCE, REMOVETOKEN, REPAIR, CLEANUP, COMPACT, SCRUB,
SETCACHECAPACITY, GETCOMPACTIONTHRESHOLD, SETCOMPACTIONTHRESHOLD, NETSTATS, CFHISTOGRAMS,
COMPACTIONSTATS, DISABLEGOSSIP, ENABLEGOSSIP, INVALIDATEKEYCACHE, INVALIDATEROWCACHE,
DISABLETHRIFT, ENABLETHRIFT
@@ -114,6 +114,7 @@ public class NodeCmd {
addCmdHelp(header, "repair [keyspace] [cfnames]", "Repair one or more column family");
addCmdHelp(header, "cleanup [keyspace] [cfnames]", "Run cleanup on one or more column family");
addCmdHelp(header, "compact [keyspace] [cfnames]", "Force a (major) compaction on one or more column family");
+ addCmdHelp(header, "scrub [keyspace] [cfnames]", "Scrub (rebuild sstables for) one or more column family");
addCmdHelp(header, "invalidatekeycache [keyspace] [cfnames]", "Invalidate the key cache of one or more column family");
addCmdHelp(header, "invalidaterowcache [keyspace] [cfnames]", "Invalidate the key cache of one or more column family");
addCmdHelp(header, "getcompactionthreshold <keyspace> <cfname>", "Print min and max compaction thresholds for a given column family");
@@ -574,6 +575,7 @@ public class NodeCmd {
case COMPACT :
case REPAIR :
case FLUSH :
+ case SCRUB :
case INVALIDATEKEYCACHE :
case INVALIDATEROWCACHE :
optionalKSandCFs(nc, arguments, probe);
@@ -628,51 +630,22 @@ public class NodeCmd {
private static void optionalKSandCFs(NodeCommand nc, String[] cmdArgs, NodeProbe probe) throws InterruptedException, IOException
{
- // Per-keyspace
- if (cmdArgs.length == 1)
+ // cmdArgs[0] is "scrub"
+ // if there is one additional arg, it's the keyspace; more are columnfamilies
+ List<String> keyspaces = cmdArgs.length == 1 ? probe.getKeyspaces() : Arrays.asList(cmdArgs[1]);
+ for (String keyspace : keyspaces)
{
- for (String keyspace : probe.getKeyspaces())
- {
- switch (nc)
- {
- case REPAIR : probe.forceTableRepair(keyspace); break;
- case INVALIDATEKEYCACHE : probe.invalidateKeyCaches(keyspace); break;
- case INVALIDATEROWCACHE : probe.invalidateRowCaches(keyspace); break;
- case FLUSH :
- try { probe.forceTableFlush(keyspace); }
- catch (ExecutionException ee) { err(ee, "Error occured while flushing keyspace " + keyspace); }
- break;
- case COMPACT :
- try { probe.forceTableCompaction(keyspace); }
- catch (ExecutionException ee) { err(ee, "Error occured while compacting keyspace " + keyspace); }
- break;
- case CLEANUP :
- if (keyspace.equals("system")) { break; } // Skip cleanup on system cfs.
- try { probe.forceTableCleanup(keyspace); }
- catch (ExecutionException ee) { err(ee, "Error occured while cleaning up keyspace " + keyspace); }
- break;
- default:
- throw new RuntimeException("Unreachable code.");
- }
- }
- }
- // Per-cf (or listed cfs) in given keyspace
- else
- {
- String keyspace = cmdArgs[1];
-
- // Check if this keyspace exists
if (!probe.getKeyspaces().contains(keyspace))
{
System.err.println("Keyspace [" + keyspace + "] does not exist.");
System.exit(1);
}
-
- String[] columnFamilies = new String[cmdArgs.length - 2];
- for (int i = 0; i < columnFamilies.length; i++)
- {
- columnFamilies[i] = cmdArgs[i + 2];
- }
+ }
+
+ // second loop so we're less likely to die halfway through due to invalid keyspace
+ for (String keyspace : keyspaces)
+ {
+ String[] columnFamilies = cmdArgs.length <= 2 ? new String[0] : Arrays.copyOfRange(cmdArgs, 2, cmdArgs.length);
switch (nc)
{
case REPAIR : probe.forceTableRepair(keyspace, columnFamilies); break;
@@ -687,9 +660,14 @@ public class NodeCmd {
catch (ExecutionException ee) { err(ee, "Error occured during compaction"); }
break;
case CLEANUP :
+ if (keyspace.equals("system")) { break; } // Skip cleanup on system cfs.
try { probe.forceTableCleanup(keyspace, columnFamilies); }
catch (ExecutionException ee) { err(ee, "Error occured during cleanup"); }
break;
+ case SCRUB :
+ try { probe.scrub(keyspace, columnFamilies); }
+ catch (ExecutionException ee) { err(ee, "Error occured while scrubbing keyspace " + keyspace); }
+ break;
default:
throw new RuntimeException("Unreachable code.");
}
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java Wed Feb 23 03:44:26 2011
@@ -167,6 +167,11 @@ public class NodeProbe
ssProxy.forceTableCleanup(tableName, columnFamilies);
}
+ public void scrub(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+ {
+ ssProxy.scrub(tableName, columnFamilies);
+ }
+
public void forceTableCompaction(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
ssProxy.forceTableCompaction(tableName, columnFamilies);
Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/CLibrary.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/CLibrary.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/CLibrary.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/CLibrary.java Wed Feb 23 03:44:26 2011
@@ -157,7 +157,7 @@ public final class CLibrary
}
}
- private static void createHardLinkWithExec(File sourceFile, File destinationFile) throws IOException
+ public static void createHardLinkWithExec(File sourceFile, File destinationFile) throws IOException
{
String osname = System.getProperty("os.name");
ProcessBuilder pb;
Modified: cassandra/branches/cassandra-0.7/test/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/conf/cassandra.yaml?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-0.7/test/conf/cassandra.yaml Wed Feb 23 03:44:26 2011
@@ -82,6 +82,11 @@ keyspaces:
rows_cached: 0
keys_cached: 0
+ - name: Super5
+ column_type: Super
+ rows_cached: 0
+ keys_cached: 0
+
- name: Indexed1
column_metadata:
- name: birthdate
Added: cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Data.db
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Data.db?rev=1073586&view=auto
==============================================================================
Binary file - no diff available.
Propchange: cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Data.db
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Filter.db
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Filter.db?rev=1073586&view=auto
==============================================================================
Binary file - no diff available.
Propchange: cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Filter.db
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Index.db
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Index.db?rev=1073586&view=auto
==============================================================================
Binary file - no diff available.
Propchange: cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Index.db
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Statistics.db
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Statistics.db?rev=1073586&view=auto
==============================================================================
Binary file - no diff available.
Propchange: cassandra/branches/cassandra-0.7/test/data/corrupt-sstables/Super5-f-2-Statistics.db
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ScrubTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ScrubTest.java?rev=1073586&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ScrubTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/ScrubTest.java Wed Feb 23 03:44:26 2011
@@ -0,0 +1,152 @@
+package org.apache.cassandra.db;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CLibrary;
+
+import static org.apache.cassandra.Util.column;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ScrubTest extends CleanupHelper
+{
+ public String TABLE = "Keyspace1";
+ public String CF = "Standard1";
+ public String CF2 = "Super5";
+ public String corruptSSTableName;
+
+
+ public void copySSTables() throws IOException
+ {
+ String root = System.getProperty("corrupt-sstable-root");
+ assert root != null;
+ File rootDir = new File(root);
+ assert rootDir.isDirectory();
+
+ String[] destDirs = DatabaseDescriptor.getAllDataFileLocationsForTable(TABLE);
+ assert destDirs != null;
+ assert destDirs.length > 0;
+
+ FileUtils.createDirectory(destDirs[0]);
+ for (File srcFile : rootDir.listFiles())
+ {
+ if (srcFile.getName().equals(".svn"))
+ continue;
+ File destFile = new File(destDirs[0]+File.separator+srcFile.getName());
+ CLibrary.createHardLinkWithExec(srcFile, destFile);
+
+ destFile = new File(destDirs[0]+File.separator+srcFile.getName());
+
+ assert destFile.exists() : destFile.getAbsoluteFile();
+
+ if(destFile.getName().endsWith("Data.db"))
+ corruptSSTableName = destFile.getCanonicalPath();
+ }
+
+ assert corruptSSTableName != null;
+ }
+
+ @Test
+ public void testScrubFile() throws Exception
+ {
+ copySSTables();
+
+ Table table = Table.open(TABLE);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(CF2);
+ assert cfs.getSSTables().size() > 0;
+
+ List<Row> rows;
+ boolean caught = false;
+ try
+ {
+ rows = cfs.getRangeSlice(ByteBufferUtil.bytes("1"), Util.range("", ""), 1000, new IdentityQueryFilter());
+ fail("This slice should fail");
+ }
+ catch (NegativeArraySizeException e)
+ {
+ caught = true;
+ }
+ assert caught : "'corrupt' test file actually was not";
+
+ CompactionManager.instance.performScrub(cfs);
+ rows = cfs.getRangeSlice(ByteBufferUtil.bytes("1"), Util.range("", ""), 1000, new IdentityQueryFilter());
+ assertEquals(100, rows.size());
+ }
+
+
+ @Test
+ public void testScrubOneRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+ Table table = Table.open(TABLE);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(CF);
+
+ List<Row> rows;
+
+ // insert data and verify we get it back w/ range query
+ fillCF(cfs, 1);
+ rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+ assertEquals(1, rows.size());
+
+ CompactionManager.instance.performScrub(cfs);
+
+ // check data is still there
+ rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+ assertEquals(1, rows.size());
+ }
+
+ @Test
+ public void testScrubMultiRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException
+ {
+ CompactionManager.instance.disableAutoCompaction();
+ Table table = Table.open(TABLE);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(CF);
+
+ List<Row> rows;
+
+ // insert data and verify we get it back w/ range query
+ fillCF(cfs, 10);
+ rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+ assertEquals(10, rows.size());
+
+ CompactionManager.instance.performScrub(cfs);
+
+ // check data is still there
+ rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+ assertEquals(10, rows.size());
+ }
+
+ protected void fillCF(ColumnFamilyStore cfs, int rowsPerSSTable) throws ExecutionException, InterruptedException, IOException
+ {
+ for (int i = 0; i < rowsPerSSTable; i++)
+ {
+ String key = String.valueOf(i);
+ // create a row and update the birthdate value, test that the index query fetches the new version
+ RowMutation rm;
+ rm = new RowMutation(TABLE, ByteBufferUtil.bytes(key));
+ ColumnFamily cf = ColumnFamily.create(TABLE, CF);
+ cf.addColumn(column("c1", "1", 1L));
+ cf.addColumn(column("c2", "2", 1L));
+ rm.add(cf);
+ rm.applyUnsafe();
+ }
+
+ cfs.forceBlockingFlush();
+ }
+
+
+
+
+}
Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=1073586&r1=1073585&r2=1073586&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java (original)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java Wed Feb 23 03:44:26 2011
@@ -234,7 +234,7 @@ public class LazilyCompactedRowTest exte
@Override
protected AbstractCompactedRow getCompactedRow()
{
- return new LazilyCompactedRow(cfStore, rows, true, Integer.MAX_VALUE);
+ return new LazilyCompactedRow(cfStore, rows, true, Integer.MAX_VALUE, true);
}
}
}