You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/06/29 10:49:01 UTC
[2/3] git commit: Improve scrub and allow to run it offline
Improve scrub and allow to run it offline
patch by slebresne; reviewed by jbellis for CASSANDRA-4321
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/75453d01
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/75453d01
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/75453d01
Branch: refs/heads/cassandra-1.1
Commit: 75453d0179b841ef2e63475f81484661c2e502cb
Parents: 88b48df
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Jun 29 10:37:10 2012 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Jun 29 10:37:10 2012 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
bin/sstablescrub | 50 ++
.../org/apache/cassandra/db/ColumnFamilyStore.java | 26 +-
src/java/org/apache/cassandra/db/DefsTable.java | 2 +-
src/java/org/apache/cassandra/db/Table.java | 20 +-
.../db/compaction/CompactionController.java | 20 +-
.../cassandra/db/compaction/CompactionManager.java | 226 +---------
.../cassandra/db/compaction/LeveledManifest.java | 21 +-
.../apache/cassandra/db/compaction/Scrubber.java | 355 +++++++++++++++
.../apache/cassandra/hadoop/BulkRecordWriter.java | 6 +-
.../apache/cassandra/io/sstable/SSTableLoader.java | 12 +-
.../apache/cassandra/io/sstable/SSTableReader.java | 27 +-
.../apache/cassandra/service/StorageService.java | 9 +-
.../org/apache/cassandra/tools/BulkLoader.java | 24 +-
.../apache/cassandra/tools/StandaloneScrubber.java | 282 ++++++++++++
.../org/apache/cassandra/utils/OutputHandler.java | 95 ++++
test/unit/org/apache/cassandra/db/ScrubTest.java | 70 +++-
17 files changed, 963 insertions(+), 283 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e9f34ed..65f9fbb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -24,6 +24,7 @@
* (cql3) allow updating column_alias types (CASSANDRA-4041)
* (cql3) Fix deletion bug (CASSANDRA-4193)
* Fix computation of overlapping sstable for leveled compaction (CASSANDRA-4321)
+ * Improve scrub and allow to run it offline (CASSANDRA-4321)
Merged from 1.0:
* Set gc_grace on index CF to 0 (CASSANDRA-4314)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/bin/sstablescrub
----------------------------------------------------------------------
diff --git a/bin/sstablescrub b/bin/sstablescrub
new file mode 100644
index 0000000..437bee0
--- /dev/null
+++ b/bin/sstablescrub
@@ -0,0 +1,50 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+ for include in /usr/share/cassandra/cassandra.in.sh \
+ /usr/local/share/cassandra/cassandra.in.sh \
+ /opt/cassandra/cassandra.in.sh \
+ ~/.cassandra.in.sh \
+ `dirname $0`/cassandra.in.sh; do
+ if [ -r $include ]; then
+ . $include
+ break
+ fi
+ done
+elif [ -r $CASSANDRA_INCLUDE ]; then
+ . $CASSANDRA_INCLUDE
+fi
+
+# Use JAVA_HOME if set, otherwise look for java in PATH
+if [ -x $JAVA_HOME/bin/java ]; then
+ JAVA=$JAVA_HOME/bin/java
+else
+ JAVA=`which java`
+fi
+
+if [ -z $CLASSPATH ]; then
+ echo "You must set the CLASSPATH var" >&2
+ exit 1
+fi
+
+$JAVA -ea -cp $CLASSPATH -Xmx256M \
+ -Dlog4j.configuration=log4j-tools.properties \
+ org.apache.cassandra.tools.StandaloneScrubber "$@"
+
+# vi:ai sw=4 ts=4 tw=0 et
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 5c7e3b2..a40e52e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -205,7 +205,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
IPartitioner partitioner,
int generation,
CFMetaData metadata,
- Directories directories)
+ Directories directories,
+ boolean loadSSTables)
{
assert metadata != null : "null metadata for " + table + ":" + columnFamilyName;
@@ -230,8 +231,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
? Collections.<DecoratedKey>emptySet()
: CacheService.instance.keyCache.readSaved(table.name, columnFamily, partitioner);
- Directories.SSTableLister sstables = directories.sstableLister().skipCompacted(true).skipTemporary(true);
- data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), savedKeys, data, metadata, this.partitioner));
+ if (loadSSTables)
+ {
+ Directories.SSTableLister sstables = directories.sstableLister().skipCompacted(true).skipTemporary(true);
+ data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), savedKeys, data, metadata, this.partitioner));
+ }
// compaction strategy should be created after the CFS has been prepared
this.compactionStrategy = metadata.createCompactionStrategyInstance(this);
@@ -304,15 +308,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return data.getMeanColumns();
}
- public static ColumnFamilyStore createColumnFamilyStore(Table table, String columnFamily)
+ public static ColumnFamilyStore createColumnFamilyStore(Table table, String columnFamily, boolean loadSSTables)
+ {
+ return createColumnFamilyStore(table, columnFamily, StorageService.getPartitioner(), Schema.instance.getCFMetaData(table.name, columnFamily), loadSSTables);
+ }
+
+ public static ColumnFamilyStore createColumnFamilyStore(Table table, String columnFamily, IPartitioner partitioner, CFMetaData metadata)
{
- return createColumnFamilyStore(table, columnFamily, StorageService.getPartitioner(), Schema.instance.getCFMetaData(table.name, columnFamily));
+ return createColumnFamilyStore(table, columnFamily, partitioner, metadata, true);
}
- public static synchronized ColumnFamilyStore createColumnFamilyStore(Table table,
+ private static synchronized ColumnFamilyStore createColumnFamilyStore(Table table,
String columnFamily,
IPartitioner partitioner,
- CFMetaData metadata)
+ CFMetaData metadata,
+ boolean loadSSTables)
{
// get the max generation number, to prevent generation conflicts
Directories directories = Directories.create(table.name, columnFamily);
@@ -328,7 +338,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Collections.sort(generations);
int value = (generations.size() > 0) ? (generations.get(generations.size() - 1)) : 0;
- return new ColumnFamilyStore(table, columnFamily, partitioner, value, metadata, directories);
+ return new ColumnFamilyStore(table, columnFamily, partitioner, value, metadata, directories, loadSSTables);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java
index be23934..93f9867 100644
--- a/src/java/org/apache/cassandra/db/DefsTable.java
+++ b/src/java/org/apache/cassandra/db/DefsTable.java
@@ -433,7 +433,7 @@ public class DefsTable
Schema.instance.setTableDefinition(ksm);
if (!StorageService.instance.isClientMode())
- Table.open(ksm.name).initCf(cfm.cfId, cfm.cfName);
+ Table.open(ksm.name).initCf(cfm.cfId, cfm.cfName, true);
}
private static void updateKeyspace(KSMetaData newState) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java
index 63e7a96..f3a414e 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -94,10 +94,15 @@ public class Table
public static Table open(String table)
{
- return open(table, Schema.instance);
+ return open(table, Schema.instance, true);
}
- public static Table open(String table, Schema schema)
+ public static Table openWithoutSSTables(String table)
+ {
+ return open(table, Schema.instance, false);
+ }
+
+ private static Table open(String table, Schema schema, boolean loadSSTables)
{
Table tableInstance = schema.getTableInstance(table);
@@ -111,7 +116,7 @@ public class Table
if (tableInstance == null)
{
// open and store the table
- tableInstance = new Table(table);
+ tableInstance = new Table(table, loadSSTables);
schema.storeTableInstance(tableInstance);
// table has to be constructed and in the cache before cacheRow can be called
@@ -275,7 +280,7 @@ public class Table
return list;
}
- private Table(String table)
+ private Table(String table, boolean loadSSTables)
{
name = table;
KSMetaData ksm = Schema.instance.getKSMetaData(table);
@@ -296,9 +301,8 @@ public class Table
for (CFMetaData cfm : new ArrayList<CFMetaData>(Schema.instance.getTableDefinition(table).cfMetaData().values()))
{
logger.debug("Initializing {}.{}", name, cfm.cfName);
- initCf(cfm.cfId, cfm.cfName);
+ initCf(cfm.cfId, cfm.cfName, loadSSTables);
}
-
}
public void createReplicationStrategy(KSMetaData ksm) throws ConfigurationException
@@ -343,7 +347,7 @@ public class Table
}
/** adds a cf to internal structures, ends up creating disk files). */
- public void initCf(Integer cfId, String cfName)
+ public void initCf(Integer cfId, String cfName, boolean loadSSTables)
{
if (columnFamilyStores.containsKey(cfId))
{
@@ -364,7 +368,7 @@ public class Table
}
else
{
- columnFamilyStores.put(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName));
+ columnFamilyStores.put(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 8ff871e..1a90ab0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -66,6 +66,19 @@ public class CompactionController
public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
{
+ this(cfs,
+ gcBefore,
+ forceDeserialize || !allLatestVersion(sstables),
+ DataTracker.buildIntervalTree(cfs.getOverlappingSSTables(sstables)),
+ cfs.getCompactionStrategy().isKeyExistenceExpensive(ImmutableSet.copyOf(sstables)));
+ }
+
+ protected CompactionController(ColumnFamilyStore cfs,
+ int gcBefore,
+ boolean deserializeRequired,
+ IntervalTree<SSTableReader> overlappingTree,
+ boolean keyExistenceIsExpensive)
+ {
assert cfs != null;
this.cfs = cfs;
this.gcBefore = gcBefore;
@@ -74,10 +87,9 @@ public class CompactionController
// add 5 minutes to be sure we're on the safe side in terms of thread safety (though we should be fine in our
// current 'stop all write during memtable switch' situation).
this.mergeShardBefore = (int) ((cfs.oldestUnflushedMemtable() + 5 * 3600) / 1000);
- deserializeRequired = forceDeserialize || !allLatestVersion(sstables);
- Set<SSTableReader> overlappingSSTables = cfs.getOverlappingSSTables(sstables);
- overlappingTree = DataTracker.buildIntervalTree(overlappingSSTables);
- keyExistenceIsExpensive = cfs.getCompactionStrategy().isKeyExistenceExpensive(ImmutableSet.copyOf(sstables));
+ this.deserializeRequired = deserializeRequired;
+ this.overlappingTree = overlappingTree;
+ this.keyExistenceIsExpensive = keyExistenceIsExpensive;
}
public String getKeyspace()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index fb5b253..3c50a09 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -473,203 +473,27 @@ public class CompactionManager implements CompactionManagerMBean
private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
{
- logger.info("Scrubbing " + sstable);
- CompactionController controller = new CompactionController(cfs, Collections.singletonList(sstable), getDefaultGcBefore(cfs), true);
- boolean isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
+ Scrubber scrubber = new Scrubber(cfs, sstable);
- // Calculate the expected compacted filesize
- File compactionFileLocation = cfs.directories.getDirectoryForNewSSTables(sstable.onDiskLength());
- if (compactionFileLocation == null)
- throw new IOException("disk full");
- int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(),
- (int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
-
- // loop through each row, deserializing to check for damage.
- // we'll also loop through the index at the same time, using the position from the index to recover if the
- // row header (key or data size) is corrupt. (This means our position in the index file will be one row
- // "ahead" of the data file.)
- final RandomAccessReader dataFile = sstable.openDataReader(true);
- long rowsRead = 0;
- RandomAccessReader indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
- ScrubInfo scrubInfo = new ScrubInfo(dataFile, sstable);
+ CompactionInfo.Holder scrubInfo = scrubber.getScrubInfo();
executor.beginCompaction(scrubInfo);
-
- SSTableWriter writer = null;
- SSTableReader newSstable = null;
- int goodRows = 0, badRows = 0, emptyRows = 0;
-
try
{
- ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
- {
- // throw away variable so we don't have a side effect in the assert
- long firstRowPositionFromIndex = indexFile.readLong();
- assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
- }
-
- // TODO errors when creating the writer may leave empty temp files.
- writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, null, Collections.singletonList(sstable));
-
- while (!dataFile.isEOF())
- {
- if (scrubInfo.isStopRequested())
- throw new CompactionInterruptedException(scrubInfo.getCompactionInfo());
- long rowStart = dataFile.getFilePointer();
- if (logger.isDebugEnabled())
- logger.debug("Reading row at " + rowStart);
-
- DecoratedKey key = null;
- long dataSize = -1;
- try
- {
- key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
- dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
- if (logger.isDebugEnabled())
- logger.debug(String.format("row %s is %s bytes", ByteBufferUtil.bytesToHex(key.key), dataSize));
- }
- catch (Throwable th)
- {
- throwIfFatal(th);
- // check for null key below
- }
-
- ByteBuffer currentIndexKey = nextIndexKey;
- long nextRowPositionFromIndex;
- try
- {
- nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
- nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong();
- }
- catch (Throwable th)
- {
- logger.warn("Error reading index file", th);
- nextIndexKey = null;
- nextRowPositionFromIndex = dataFile.length();
- }
-
- long dataStart = dataFile.getFilePointer();
- long dataStartFromIndex = currentIndexKey == null
- ? -1
- : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
- long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
- assert currentIndexKey != null || indexFile.isEOF();
- if (logger.isDebugEnabled() && currentIndexKey != null)
- logger.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey), dataSizeFromIndex));
-
- writer.mark();
- try
- {
- if (key == null)
- throw new IOError(new IOException("Unable to read row key from data file"));
- if (dataSize > dataFile.length())
- throw new IOError(new IOException("Impossible row size " + dataSize));
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
- AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (compactedRow.isEmpty())
- {
- emptyRows++;
- }
- else
- {
- writer.append(compactedRow);
- goodRows++;
- }
- if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
- logger.warn("Index file contained a different key or row size; using key from data file");
- }
- catch (Throwable th)
- {
- throwIfFatal(th);
- logger.warn("Non-fatal error reading row (stacktrace follows)", th);
- writer.resetAndTruncate();
-
- if (currentIndexKey != null
- && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
- {
- logger.info(String.format("Retrying from row index; data is %s bytes starting at %s",
- dataSizeFromIndex, dataStartFromIndex));
- key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey);
- try
- {
- SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
- AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
- if (compactedRow.isEmpty())
- {
- emptyRows++;
- }
- else
- {
- writer.append(compactedRow);
- goodRows++;
- }
- }
- catch (Throwable th2)
- {
- throwIfFatal(th2);
- // Skipping rows is dangerous for counters (see CASSANDRA-2759)
- if (isCommutative)
- throw new IOError(th2);
-
- logger.warn("Retry failed too. Skipping to next row (retry's stacktrace follows)", th2);
- writer.resetAndTruncate();
- dataFile.seek(nextRowPositionFromIndex);
- badRows++;
- }
- }
- else
- {
- // Skipping rows is dangerous for counters (see CASSANDRA-2759)
- if (isCommutative)
- throw new IOError(th);
-
- logger.warn("Row at " + dataStart + " is unreadable; skipping to next");
- if (currentIndexKey != null)
- dataFile.seek(nextRowPositionFromIndex);
- badRows++;
- }
- }
- if ((rowsRead++ % 1000) == 0)
- controller.mayThrottle(dataFile.getFilePointer());
- }
-
- if (writer.getFilePointer() > 0)
- newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
- }
- catch (Exception e)
- {
- if (writer != null)
- writer.abort();
- throw FBUtilities.unchecked(e);
+ scrubber.scrub();
}
finally
{
- FileUtils.closeQuietly(dataFile);
- FileUtils.closeQuietly(indexFile);
-
+ scrubber.close();
executor.finishCompaction(scrubInfo);
}
- if (newSstable == null)
- {
- cfs.markCompacted(Arrays.asList(sstable), OperationType.SCRUB);
- if (badRows > 0)
- logger.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
- else
- logger.info("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned");
- }
- else
- {
- cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable), OperationType.SCRUB);
- logger.info("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
- if (badRows > 0)
- logger.warn("Unable to recover " + badRows + " rows that were skipped. You can attempt manual recovery from the pre-scrub snapshot. You can also run nodetool repair to transfer the data from a healthy replica, if any");
- }
- }
+ if (scrubber.getNewInOrderSSTable() != null)
+ cfs.addSSTable(scrubber.getNewInOrderSSTable());
- private void throwIfFatal(Throwable th)
- {
- if (th instanceof Error && !(th instanceof AssertionError || th instanceof IOError))
- throw (Error) th;
+ if (scrubber.getNewSSTable() == null)
+ cfs.markCompacted(Collections.singletonList(sstable), OperationType.SCRUB);
+ else
+ cfs.replaceCompactedSSTables(Collections.singletonList(sstable), Collections.singletonList(scrubber.getNewSSTable()), OperationType.SCRUB);
}
/**
@@ -819,8 +643,8 @@ public class CompactionManager implements CompactionManagerMBean
}
}
- private SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, File compactionFileLocation, int expectedBloomFilterSize, SSTableWriter writer, Collection<SSTableReader> sstables)
- throws IOException
+ public static SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, File compactionFileLocation, int expectedBloomFilterSize, SSTableWriter writer, Collection<SSTableReader> sstables)
+ throws IOException
{
if (writer == null)
{
@@ -1218,32 +1042,6 @@ public class CompactionManager implements CompactionManagerMBean
}
}
- private static class ScrubInfo extends CompactionInfo.Holder
- {
- private final RandomAccessReader dataFile;
- private final SSTableReader sstable;
- public ScrubInfo(RandomAccessReader dataFile, SSTableReader sstable)
- {
- this.dataFile = dataFile;
- this.sstable = sstable;
- }
-
- public CompactionInfo getCompactionInfo()
- {
- try
- {
- return new CompactionInfo(sstable.metadata,
- OperationType.SCRUB,
- dataFile.getFilePointer(),
- dataFile.length());
- }
- catch (Exception e)
- {
- throw new RuntimeException();
- }
- }
- }
-
public void stopCompaction(String type)
{
OperationType operation = OperationType.valueOf(type);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index beb7d74..9504072 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -85,11 +85,16 @@ public class LeveledManifest
static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize)
{
+ return create(cfs, maxSSTableSize, cfs.getSSTables());
+ }
+
+ public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, Iterable<SSTableReader> sstables)
+ {
LeveledManifest manifest = new LeveledManifest(cfs, maxSSTableSize);
- load(cfs, manifest);
+ load(cfs, manifest, sstables);
// ensure all SSTables are in the manifest
- for (SSTableReader ssTableReader : cfs.getSSTables())
+ for (SSTableReader ssTableReader : sstables)
{
if (manifest.levelOf(ssTableReader) < 0)
manifest.add(ssTableReader);
@@ -98,7 +103,7 @@ public class LeveledManifest
return manifest;
}
- private static void load(ColumnFamilyStore cfs, LeveledManifest manifest)
+ private static void load(ColumnFamilyStore cfs, LeveledManifest manifest, Iterable<SSTableReader> sstables)
{
File manifestFile = tryGetManifest(cfs);
if (manifestFile == null)
@@ -116,7 +121,7 @@ public class LeveledManifest
JsonNode generationValues = generation.get("members");
for (JsonNode generationValue : generationValues)
{
- for (SSTableReader ssTableReader : cfs.getSSTables())
+ for (SSTableReader ssTableReader : sstables)
{
if (ssTableReader.descriptor.generation == generationValue.getIntValue())
{
@@ -217,6 +222,14 @@ public class LeveledManifest
serialize();
}
+ public synchronized void sendBackToL0(SSTableReader sstable)
+ {
+ remove(sstable);
+ add(sstable, 0);
+
+ serialize();
+ }
+
private String toString(Iterable<SSTableReader> sstables)
{
StringBuilder builder = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
new file mode 100644
index 0000000..314a873
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.nio.ByteBuffer;
+import java.io.*;
+import java.util.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.IntervalTree.*;
+import org.apache.cassandra.utils.OutputHandler;
+
+public class Scrubber implements Closeable
+{
+ public final ColumnFamilyStore cfs;
+ public final SSTableReader sstable;
+ public final File destination;
+
+ private final CompactionController controller;
+ private final boolean isCommutative;
+ private final int expectedBloomFilterSize;
+
+ private final RandomAccessReader dataFile;
+ private final RandomAccessReader indexFile;
+ private final ScrubInfo scrubInfo;
+
+ private long rowsRead;
+
+ private SSTableWriter writer;
+ private SSTableReader newSstable;
+ private SSTableReader newInOrderSstable;
+
+ private int goodRows;
+ private int badRows;
+ private int emptyRows;
+
+ private final OutputHandler outputHandler;
+
+ private static final Comparator<AbstractCompactedRow> acrComparator = new Comparator<AbstractCompactedRow>()
+ {
+ public int compare(AbstractCompactedRow r1, AbstractCompactedRow r2)
+ {
+ return r1.key.compareTo(r2.key);
+ }
+ };
+ private final Set<AbstractCompactedRow> outOfOrderRows = new TreeSet<AbstractCompactedRow>(acrComparator);
+
+ public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException
+ {
+ this(cfs, sstable, new OutputHandler.LogOutput(), false);
+ }
+
+ public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, OutputHandler outputHandler, boolean isOffline) throws IOException
+ {
+ this.cfs = cfs;
+ this.sstable = sstable;
+ this.outputHandler = outputHandler;
+
+ // Calculate the expected compacted filesize
+ this.destination = cfs.directories.getDirectoryForNewSSTables(sstable.onDiskLength());
+ if (destination == null)
+ throw new IOException("disk full");
+
+ List<SSTableReader> toScrub = Collections.singletonList(sstable);
+ // If we run scrub offline, we should never purge tombstone, as we cannot know if other sstable have data that the tombstone deletes.
+ this.controller = isOffline
+ ? new ScrubController(cfs)
+ : new CompactionController(cfs, Collections.singletonList(sstable), CompactionManager.getDefaultGcBefore(cfs), true);
+ this.isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
+ this.expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(toScrub)));
+
+ // loop through each row, deserializing to check for damage.
+ // we'll also loop through the index at the same time, using the position from the index to recover if the
+ // row header (key or data size) is corrupt. (This means our position in the index file will be one row
+ // "ahead" of the data file.)
+ this.dataFile = sstable.openDataReader(true);
+ this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
+ this.scrubInfo = new ScrubInfo(dataFile, sstable);
+ }
+
+ public void scrub() throws IOException
+ {
+ outputHandler.output("Scrubbing " + sstable);
+ try
+ {
+ ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
+ {
+ // throw away variable so we don't have a side effect in the assert
+ long firstRowPositionFromIndex = indexFile.readLong();
+ assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
+ }
+
+ // TODO errors when creating the writer may leave empty temp files.
+ writer = CompactionManager.maybeCreateWriter(cfs, destination, expectedBloomFilterSize, null, Collections.singletonList(sstable));
+
+ AbstractCompactedRow prevRow = null;
+
+ while (!dataFile.isEOF())
+ {
+ if (scrubInfo.isStopRequested())
+ throw new CompactionInterruptedException(scrubInfo.getCompactionInfo());
+ long rowStart = dataFile.getFilePointer();
+ outputHandler.debug("Reading row at " + rowStart);
+
+ DecoratedKey key = null;
+ long dataSize = -1;
+ try
+ {
+ key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
+ dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
+ outputHandler.debug(String.format("row %s is %s bytes", ByteBufferUtil.bytesToHex(key.key), dataSize));
+ }
+ catch (Throwable th)
+ {
+ throwIfFatal(th);
+ // check for null key below
+ }
+
+ ByteBuffer currentIndexKey = nextIndexKey;
+ long nextRowPositionFromIndex;
+ try
+ {
+ nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
+ nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong();
+ }
+ catch (Throwable th)
+ {
+ outputHandler.warn("Error reading index file", th);
+ nextIndexKey = null;
+ nextRowPositionFromIndex = dataFile.length();
+ }
+
+ long dataStart = dataFile.getFilePointer();
+ long dataStartFromIndex = currentIndexKey == null
+ ? -1
+ : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
+ long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
+ assert currentIndexKey != null || indexFile.isEOF();
+ if (currentIndexKey != null)
+ outputHandler.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey), dataSizeFromIndex));
+
+ writer.mark();
+ try
+ {
+ if (key == null)
+ throw new IOError(new IOException("Unable to read row key from data file"));
+ if (dataSize > dataFile.length())
+ throw new IOError(new IOException("Impossible row size " + dataSize));
+ SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+ AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
+ if (compactedRow.isEmpty())
+ {
+ emptyRows++;
+ }
+ else
+ {
+ if (prevRow != null && acrComparator.compare(prevRow, compactedRow) > 0)
+ {
+ outOfOrderRows.add(compactedRow);
+ outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+ continue;
+ }
+
+ writer.append(compactedRow);
+ prevRow = compactedRow;
+ goodRows++;
+ }
+ if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
+ outputHandler.warn("Index file contained a different key or row size; using key from data file");
+ }
+ catch (Throwable th)
+ {
+ throwIfFatal(th);
+ outputHandler.warn("Non-fatal error reading row (stacktrace follows)", th);
+ writer.resetAndTruncate();
+
+ if (currentIndexKey != null
+ && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
+ {
+ outputHandler.output(String.format("Retrying from row index; data is %s bytes starting at %s",
+ dataSizeFromIndex, dataStartFromIndex));
+ key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey);
+ try
+ {
+ SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
+ AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
+ if (compactedRow.isEmpty())
+ {
+ emptyRows++;
+ }
+ else
+ {
+ if (prevRow != null && acrComparator.compare(prevRow, compactedRow) > 0)
+ {
+ outOfOrderRows.add(compactedRow);
+ outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+ continue;
+ }
+ writer.append(compactedRow);
+ prevRow = compactedRow;
+ goodRows++;
+ }
+ }
+ catch (Throwable th2)
+ {
+ throwIfFatal(th2);
+ // Skipping rows is dangerous for counters (see CASSANDRA-2759)
+ if (isCommutative)
+ throw new IOError(th2);
+
+ outputHandler.warn("Retry failed too. Skipping to next row (retry's stacktrace follows)", th2);
+ writer.resetAndTruncate();
+ dataFile.seek(nextRowPositionFromIndex);
+ badRows++;
+ }
+ }
+ else
+ {
+ // Skipping rows is dangerous for counters (see CASSANDRA-2759)
+ if (isCommutative)
+ throw new IOError(th);
+
+ outputHandler.warn("Row at " + dataStart + " is unreadable; skipping to next");
+ if (currentIndexKey != null)
+ dataFile.seek(nextRowPositionFromIndex);
+ badRows++;
+ }
+ }
+ if ((rowsRead++ % 1000) == 0)
+ controller.mayThrottle(dataFile.getFilePointer());
+ }
+
+ if (writer.getFilePointer() > 0)
+ newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
+ }
+ catch (Exception e)
+ {
+ if (writer != null)
+ writer.abort();
+ throw FBUtilities.unchecked(e);
+ }
+
+ if (!outOfOrderRows.isEmpty())
+ {
+ SSTableWriter inOrderWriter = CompactionManager.maybeCreateWriter(cfs, destination, expectedBloomFilterSize, null, Collections.singletonList(sstable));
+ for (AbstractCompactedRow row : outOfOrderRows)
+ inOrderWriter.append(row);
+ newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
+ outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
+ }
+
+ if (newSstable == null)
+ {
+ if (badRows > 0)
+ outputHandler.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
+ else
+ outputHandler.output("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned");
+ }
+ else
+ {
+ outputHandler.output("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
+ if (badRows > 0)
+ outputHandler.warn("Unable to recover " + badRows + " rows that were skipped. You can attempt manual recovery from the pre-scrub snapshot. You can also run nodetool repair to transfer the data from a healthy replica, if any");
+ }
+ }
+
+ public SSTableReader getNewSSTable()
+ {
+ return newSstable;
+ }
+
+ public SSTableReader getNewInOrderSSTable()
+ {
+ return newInOrderSstable;
+ }
+
+ private void throwIfFatal(Throwable th)
+ {
+ if (th instanceof Error && !(th instanceof AssertionError || th instanceof IOError))
+ throw (Error) th;
+ }
+
+ public void close()
+ {
+ FileUtils.closeQuietly(dataFile);
+ FileUtils.closeQuietly(indexFile);
+ }
+
+ public CompactionInfo.Holder getScrubInfo()
+ {
+ return scrubInfo;
+ }
+
+ private static class ScrubInfo extends CompactionInfo.Holder
+ {
+ private final RandomAccessReader dataFile;
+ private final SSTableReader sstable;
+
+ public ScrubInfo(RandomAccessReader dataFile, SSTableReader sstable)
+ {
+ this.dataFile = dataFile;
+ this.sstable = sstable;
+ }
+
+ public CompactionInfo getCompactionInfo()
+ {
+ try
+ {
+ return new CompactionInfo(sstable.metadata,
+ OperationType.SCRUB,
+ dataFile.getFilePointer(),
+ dataFile.length());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException();
+ }
+ }
+ }
+
+ private static class ScrubController extends CompactionController
+ {
+ public ScrubController(ColumnFamilyStore cfs)
+ {
+ super(cfs, Integer.MAX_VALUE, true, new IntervalTree<SSTableReader>(Collections.<Interval>emptyList()), false);
+ }
+
+ @Override
+ public boolean shouldPurge(DecoratedKey key)
+ {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index cfd5fe4..25033ec 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.OutputHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -345,10 +346,11 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
}
}
- static class NullOutputHandler implements SSTableLoader.OutputHandler
+ static class NullOutputHandler implements OutputHandler
{
public void output(String msg) {}
-
public void debug(String msg) {}
+ public void warn(String msg) {}
+ public void warn(String msg, Throwable th) {}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 850d23b..b40d80b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -35,8 +35,7 @@ import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.streaming.*;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.*;
/**
* Cassandra SSTable bulk loader.
@@ -252,15 +251,6 @@ public class SSTableLoader
}
}
- public interface OutputHandler
- {
- // called when an important info need to be displayed
- public void output(String msg);
-
- // called when a less important info need to be displayed
- public void debug(String msg);
- }
-
public static abstract class Client
{
private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<InetAddress, Collection<Range<Token>>>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 7d4d304..0aaa932 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -138,6 +138,11 @@ public class SSTableReader extends SSTable
return open(desc, componentsFor(desc), metadata, p);
}
+ public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
+ {
+ return open(descriptor, components, Collections.<DecoratedKey>emptySet(), null, metadata, StorageService.getPartitioner(), false);
+ }
+
public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
{
return open(descriptor, components, Collections.<DecoratedKey>emptySet(), null, metadata, partitioner);
@@ -145,6 +150,17 @@ public class SSTableReader extends SSTable
public static SSTableReader open(Descriptor descriptor, Set<Component> components, Set<DecoratedKey> savedKeys, DataTracker tracker, CFMetaData metadata, IPartitioner partitioner) throws IOException
{
+ return open(descriptor, components, savedKeys, tracker, metadata, partitioner, true);
+ }
+
+ private static SSTableReader open(Descriptor descriptor,
+ Set<Component> components,
+ Set<DecoratedKey> savedKeys,
+ DataTracker tracker,
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ boolean validate) throws IOException
+ {
assert partitioner != null;
// Minimum components without which we can't do anything
assert components.contains(Component.DATA);
@@ -187,6 +203,10 @@ public class SSTableReader extends SSTable
sstable.load(false, savedKeys);
sstable.loadBloomFilter();
}
+
+ if (validate)
+ sstable.validate();
+
if (logger.isDebugEnabled())
logger.debug("INDEX LOAD TIME for " + descriptor + ": " + (System.currentTimeMillis() - start) + " ms.");
@@ -409,13 +429,18 @@ public class SSTableReader extends SSTable
}
this.first = getMinimalKey(left);
this.last = getMinimalKey(right);
- assert this.first.compareTo(this.last) <= 0: String.format("SSTable first key %s > last key %s", this.first, this.last);
// finalize the state of the reader
ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
}
+ private void validate()
+ {
+ if (this.first.compareTo(this.last) > 0)
+ throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last));
+ }
+
/** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */
private long getIndexScanPosition(RowPosition key)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index dbaf751..9d9690a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -68,6 +68,7 @@ import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.NodeId;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.OutputHandler;
import org.apache.cassandra.utils.WrappedRunnable;
/**
@@ -3038,13 +3039,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
};
- SSTableLoader.OutputHandler oh = new SSTableLoader.OutputHandler()
- {
- public void output(String msg) { logger_.info(msg); }
- public void debug(String msg) { logger_.debug(msg); }
- };
-
- SSTableLoader loader = new SSTableLoader(dir, client, oh);
+ SSTableLoader loader = new SSTableLoader(dir, client, new OutputHandler.LogOutput());
try
{
loader.stream().get();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 4520188..ace37db 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.streaming.PendingFile;
import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.OutputHandler;
import org.apache.commons.cli.*;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
@@ -55,7 +56,8 @@ public class BulkLoader
LoaderOptions options = LoaderOptions.parseArgs(args);
try
{
- SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(options, options.hosts, options.rpcPort), options);
+ OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
+ SSTableLoader loader = new SSTableLoader(options.directory, new ExternalClient(handler, options.hosts, options.rpcPort), handler);
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
SSTableLoader.LoaderFuture future = loader.stream(options.ignores);
@@ -174,11 +176,11 @@ public class BulkLoader
static class ExternalClient extends SSTableLoader.Client
{
private final Map<String, Set<String>> knownCfs = new HashMap<String, Set<String>>();
- private final SSTableLoader.OutputHandler outputHandler;
+ private final OutputHandler outputHandler;
private Set<InetAddress> hosts = new HashSet<InetAddress>();
private int rpcPort;
- public ExternalClient(SSTableLoader.OutputHandler outputHandler, Set<InetAddress> hosts, int port)
+ public ExternalClient(OutputHandler outputHandler, Set<InetAddress> hosts, int port)
{
super();
this.outputHandler = outputHandler;
@@ -245,7 +247,7 @@ public class BulkLoader
}
}
- static class LoaderOptions implements SSTableLoader.OutputHandler
+ static class LoaderOptions
{
public final File directory;
@@ -367,18 +369,6 @@ public class BulkLoader
printUsage(options);
System.exit(1);
}
-
- public void output(String msg)
- {
- System.out.println(msg);
- }
-
- public void debug(String msg)
- {
- if (verbose)
- System.out.println(msg);
- }
-
private static CmdLineOptions getCmdLineOptions()
{
CmdLineOptions options = new CmdLineOptions();
@@ -409,7 +399,7 @@ public class BulkLoader
}
}
- private static class CmdLineOptions extends Options
+ public static class CmdLineOptions extends Options
{
/**
* Add option with argument and argument name
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
new file mode 100644
index 0000000..0ac6a80
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.tools;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.commons.cli.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
+import org.apache.cassandra.db.compaction.LeveledManifest;
+import org.apache.cassandra.db.compaction.Scrubber;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.AbstractCassandraDaemon;
+import org.apache.cassandra.utils.OutputHandler;
+import static org.apache.cassandra.tools.BulkLoader.CmdLineOptions;
+
+public class StandaloneScrubber
+{
+ static
+ {
+ AbstractCassandraDaemon.initLog4j();
+ }
+
+ private static final String TOOL_NAME = "sstablescrub";
+ private static final String VERBOSE_OPTION = "verbose";
+ private static final String DEBUG_OPTION = "debug";
+ private static final String HELP_OPTION = "help";
+ private static final String MANIFEST_CHECK_OPTION = "manifest-check";
+
+ public static void main(String args[]) throws IOException
+ {
+ Options options = Options.parseArgs(args);
+ try
+ {
+ // load keyspace descriptions.
+ DatabaseDescriptor.loadSchemas();
+
+ if (Schema.instance.getCFMetaData(options.tableName, options.cfName) == null)
+ throw new IllegalArgumentException(String.format("Unknown keyspace/columnFamily %s.%s",
+ options.tableName,
+ options.cfName));
+
+ // Do not load sstables since they might be broken
+ Table table = Table.openWithoutSSTables(options.tableName);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(options.cfName);
+ String snapshotName = "pre-scrub-" + System.currentTimeMillis();
+
+ OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
+ Directories.SSTableLister lister = cfs.directories.sstableLister().skipCompacted(true).skipTemporary(true);
+
+ List<SSTableReader> sstables = new ArrayList<SSTableReader>();
+
+ // Scrub sstables
+ for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
+ {
+ Set<Component> components = entry.getValue();
+ if (!components.contains(Component.DATA) || !components.contains(Component.PRIMARY_INDEX))
+ continue;
+
+ try
+ {
+ SSTableReader sstable = SSTableReader.openNoValidation(entry.getKey(), components, cfs.metadata);
+ sstables.add(sstable);
+
+ File snapshotDirectory = Directories.getSnapshotDirectory(sstable.descriptor, snapshotName);
+ sstable.createLinks(snapshotDirectory.getPath());
+
+ }
+ catch (Exception e)
+ {
+ System.err.println(String.format("Error Loading %s: %s", entry.getKey(), e.getMessage()));
+ if (options.debug)
+ e.printStackTrace(System.err);
+ }
+ }
+ System.out.println(String.format("Pre-scrub sstables snapshotted into snapshot %s", snapshotName));
+
+ // If leveled, load the manifest
+ LeveledManifest manifest = null;
+ if (cfs.directories.tryGetLeveledManifest() != null)
+ {
+ cfs.directories.snapshotLeveledManifest(snapshotName);
+ System.out.println(String.format("Leveled manifest snapshotted into snapshot %s", snapshotName));
+
+ int maxSizeInMB = (int)((((LeveledCompactionStrategy)cfs.getCompactionStrategy()).getMaxSSTableSize()) / (1024L * 1024L));
+ manifest = LeveledManifest.create(cfs, maxSizeInMB, sstables);
+ }
+
+ if (!options.manifestCheckOnly)
+ {
+ for (SSTableReader sstable : sstables)
+ {
+ try
+ {
+ Scrubber scrubber = new Scrubber(cfs, sstable, handler, true);
+ try
+ {
+ scrubber.scrub();
+ }
+ finally
+ {
+ scrubber.close();
+ }
+
+ if (manifest != null)
+ {
+ if (scrubber.getNewInOrderSSTable() != null)
+ manifest.add(scrubber.getNewInOrderSSTable());
+
+ List<SSTableReader> added = scrubber.getNewSSTable() == null
+ ? Collections.<SSTableReader>emptyList()
+ : Collections.<SSTableReader>singletonList(scrubber.getNewSSTable());
+ manifest.replace(Collections.singletonList(sstable), added);
+ }
+
+ // Remove the sstable (it's been copied by scrub and snapshotted)
+ sstable.markCompacted();
+ sstable.releaseReference();
+ }
+ catch (Exception e)
+ {
+ System.err.println(String.format("Error scrubbing %s: %s", sstable, e.getMessage()));
+ if (options.debug)
+ e.printStackTrace(System.err);
+ }
+ }
+ }
+
+ // Check (and repair) manifest
+ if (manifest != null)
+ checkManifest(manifest);
+
+ SSTableDeletingTask.waitForDeletions();
+ System.exit(0); // We need that to stop non daemonized threads
+ }
+ catch (Exception e)
+ {
+ System.err.println(e.getMessage());
+ if (options.debug)
+ e.printStackTrace(System.err);
+ System.exit(1);
+ }
+ }
+
+ private static void checkManifest(LeveledManifest manifest)
+ {
+ System.out.println(String.format("Checking leveled manifest"));
+ for (int i = 1; i <= manifest.getLevelCount(); ++i)
+ {
+ List<SSTableReader> sstables = new ArrayList(manifest.getLevel(i));
+ Collections.sort(sstables, SSTable.sstableComparator);
+ if (sstables.isEmpty())
+ continue;
+
+ Iterator<SSTableReader> iter = sstables.iterator();
+ SSTableReader previous = iter.next();
+ while (iter.hasNext())
+ {
+ SSTableReader current = iter.next();
+
+ if (previous.last.compareTo(current.first) > 0)
+ {
+ System.err.println(String.format("At level %d, %s [%s, %s] overlaps %s [%s, %s]", i,
+ previous, previous.first, previous.last,
+ current, current.first, current.last));
+ System.out.println(String.format("Sending %s back to L0 to fix intra-level overlapping", current));
+ manifest.sendBackToL0(current);
+ }
+ else
+ {
+ previous = current;
+ }
+ }
+ }
+ }
+
+ private static class Options
+ {
+ public final String tableName;
+ public final String cfName;
+
+ public boolean debug;
+ public boolean verbose;
+ public boolean manifestCheckOnly;
+
+ private Options(String tableName, String cfName)
+ {
+ this.tableName = tableName;
+ this.cfName = cfName;
+ }
+
+ public static Options parseArgs(String cmdArgs[])
+ {
+ CommandLineParser parser = new GnuParser();
+ CmdLineOptions options = getCmdLineOptions();
+ try
+ {
+ CommandLine cmd = parser.parse(options, cmdArgs, false);
+
+ if (cmd.hasOption(HELP_OPTION))
+ {
+ printUsage(options);
+ System.exit(0);
+ }
+
+ String[] args = cmd.getArgs();
+ if (args.length != 2)
+ {
+ String msg = args.length < 2 ? "Missing arguments" : "Too many arguments";
+ System.err.println(msg);
+ printUsage(options);
+ System.exit(1);
+ }
+
+ String tableName = args[0];
+ String cfName = args[1];
+
+ Options opts = new Options(tableName, cfName);
+
+ opts.debug = cmd.hasOption(DEBUG_OPTION);
+ opts.verbose = cmd.hasOption(VERBOSE_OPTION);
+ opts.manifestCheckOnly = cmd.hasOption(MANIFEST_CHECK_OPTION);
+
+ return opts;
+ }
+ catch (ParseException e)
+ {
+ errorMsg(e.getMessage(), options);
+ return null;
+ }
+ }
+
+ private static void errorMsg(String msg, CmdLineOptions options)
+ {
+ System.err.println(msg);
+ printUsage(options);
+ System.exit(1);
+ }
+
+ private static CmdLineOptions getCmdLineOptions()
+ {
+ CmdLineOptions options = new CmdLineOptions();
+ options.addOption(null, DEBUG_OPTION, "display stack traces");
+ options.addOption("v", VERBOSE_OPTION, "verbose output");
+ options.addOption("h", HELP_OPTION, "display this help message");
+ options.addOption("m", MANIFEST_CHECK_OPTION, "only check and repair the leveled manifest, without actually scrubbing the sstables");
+ return options;
+ }
+
+ public static void printUsage(CmdLineOptions options)
+ {
+ String usage = String.format("%s [options] <keyspace> <column_family>", TOOL_NAME);
+ StringBuilder header = new StringBuilder();
+ header.append("--\n");
+ header.append("Scrub the sstable for the provided column family." );
+ header.append("\n--\n");
+ header.append("Options are:");
+ new HelpFormatter().printHelp(usage, header.toString(), options, "");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/src/java/org/apache/cassandra/utils/OutputHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/OutputHandler.java b/src/java/org/apache/cassandra/utils/OutputHandler.java
new file mode 100644
index 0000000..b203663
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/OutputHandler.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public interface OutputHandler
+{
+ // called when an important info need to be displayed
+ public void output(String msg);
+
+ // called when a less important info need to be displayed
+ public void debug(String msg);
+
+ // called when the user needs to be warn
+ public void warn(String msg);
+ public void warn(String msg, Throwable th);
+
+ public static class LogOutput implements OutputHandler
+ {
+ private static Logger logger = LoggerFactory.getLogger(LogOutput.class);
+
+ public void output(String msg)
+ {
+ logger.info(msg);
+ }
+
+ public void debug(String msg)
+ {
+ logger.debug(msg);
+ }
+
+ public void warn(String msg)
+ {
+ logger.warn(msg);
+ }
+
+ public void warn(String msg, Throwable th)
+ {
+ logger.warn(msg, th);
+ }
+ }
+
+ public static class SystemOutput implements OutputHandler
+ {
+ public final boolean debug;
+ public final boolean printStack;
+
+ public SystemOutput(boolean debug, boolean printStack)
+ {
+ this.debug = debug;
+ this.printStack = printStack;
+ }
+
+ public void output(String msg)
+ {
+ System.out.println(msg);
+ }
+
+ public void debug(String msg)
+ {
+ if (debug)
+ System.out.println(msg);
+ }
+
+ public void warn(String msg)
+ {
+ warn(msg, null);
+ }
+
+ public void warn(String msg, Throwable th)
+ {
+ System.out.println("WARNING: " + msg);
+ if (printStack && th != null)
+ th.printStackTrace(System.out);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75453d01/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 7cc431d..abec718 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -30,10 +30,13 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CLibrary;
@@ -41,29 +44,33 @@ import static org.apache.cassandra.Util.column;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import java.util.*;
+
public class ScrubTest extends SchemaLoader
{
public String TABLE = "Keyspace1";
public String CF = "Standard1";
public String CF2 = "Super5";
public String CF3 = "Standard2";
- public String corruptSSTableName;
-
- public void copySSTables() throws IOException
+ public String copySSTables(String cf) throws IOException
{
String root = System.getProperty("corrupt-sstable-root");
assert root != null;
File rootDir = new File(root);
assert rootDir.isDirectory();
- File destDir = Directories.create(TABLE, CF2).getDirectoryForNewSSTables(1);
+ File destDir = Directories.create(TABLE, cf).getDirectoryForNewSSTables(1);
+
+ String corruptSSTableName = null;
FileUtils.createDirectory(destDir);
for (File srcFile : rootDir.listFiles())
{
if (srcFile.getName().equals(".svn"))
continue;
+ if (!srcFile.getName().contains(cf))
+ continue;
File destFile = new File(destDir, srcFile.getName());
CLibrary.createHardLink(srcFile, destFile);
@@ -74,12 +81,13 @@ public class ScrubTest extends SchemaLoader
}
assert corruptSSTableName != null;
+ return corruptSSTableName;
}
@Test
public void testScrubFile() throws Exception
{
- copySSTables();
+ copySSTables(CF2);
Table table = Table.open(TABLE);
ColumnFamilyStore cfs = table.getColumnFamilyStore(CF2);
@@ -104,7 +112,6 @@ public class ScrubTest extends SchemaLoader
assertEquals(100, rows.size());
}
-
@Test
public void testScrubOneRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException
{
@@ -166,6 +173,57 @@ public class ScrubTest extends SchemaLoader
assertEquals(10, rows.size());
}
+ @Test
+ public void testScubOutOfOrder() throws Exception
+ {
+ CompactionManager.instance.disableAutoCompaction();
+ Table table = Table.open(TABLE);
+ String columnFamily = "Standard3";
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(columnFamily);
+
+ /*
+ * Code used to generate an outOfOrder sstable. The test must be run without assertions for this to work.
+ * The test also assumes an ordered partitioner.
+ *
+ * ColumnFamily cf = ColumnFamily.create(TABLE, columnFamily);
+ * cf.addColumn(new Column(ByteBufferUtil.bytes("someName"), ByteBufferUtil.bytes("someValue"), 0L));
+
+ * SSTableWriter writer = cfs.createCompactionWriter((long)DatabaseDescriptor.getIndexInterval(), new File("."), Collections.<SSTableReader>emptyList());
+ * writer.append(Util.dk("a"), cf);
+ * writer.append(Util.dk("b"), cf);
+ * writer.append(Util.dk("z"), cf);
+ * writer.append(Util.dk("c"), cf);
+ * writer.append(Util.dk("y"), cf);
+ * writer.append(Util.dk("d"), cf);
+ * writer.closeAndOpenReader();
+ */
+
+ copySSTables(columnFamily);
+ cfs.loadNewSSTables();
+ assert cfs.getSSTables().size() > 0;
+
+ List<Row> rows;
+ rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter(), null);
+ assert !isRowOrdered(rows) : "'corrupt' test file actually was not";
+
+ CompactionManager.instance.performScrub(cfs);
+ rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter(), null);
+ assert isRowOrdered(rows) : "Scrub failed: " + rows;
+ assert rows.size() == 6: "Got " + rows.size();
+ }
+
+ private static boolean isRowOrdered(List<Row> rows)
+ {
+ DecoratedKey prev = null;
+ for (Row row : rows)
+ {
+ if (prev != null && prev.compareTo(row.key) > 0)
+ return false;
+ prev = row.key;
+ }
+ return true;
+ }
+
protected void fillCF(ColumnFamilyStore cfs, int rowsPerSSTable) throws ExecutionException, InterruptedException, IOException
{
for (int i = 0; i < rowsPerSSTable; i++)