You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/04/28 09:22:36 UTC
[1/2] git commit: Multi-threaded scrub/cleanup/upgradesstables
Repository: cassandra
Updated Branches:
refs/heads/trunk 7000efa21 -> d0ddff1d3
Multi-threaded scrub/cleanup/upgradesstables
Patch by rspitzer and marcuse for CASSANDRA-5547
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c8dcc751
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c8dcc751
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c8dcc751
Branch: refs/heads/trunk
Commit: c8dcc7515e9f1d7f73f2d5e6651862a1b1023bea
Parents: fccc123
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Apr 24 08:57:46 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Apr 28 08:52:00 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 328 +++++++++----------
2 files changed, 153 insertions(+), 176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8dcc751/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5baaefd..1b67357 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -52,6 +52,7 @@
* fix cassandra stress errors on reads with native protocol (CASSANDRA-7033)
* Use OpOrder to guard sstable references for reads (CASSANDRA-6919)
* Preemptive opening of compaction result (CASSANDRA-6916)
+ * Multi-threaded scrub/cleanup/upgradesstables (CASSANDRA-5547)
Merged from 2.0:
* Set JMX RMI port to 7199 (CASSANDRA-7087)
* Use LOCAL_QUORUM for data reads at LOCAL_SERIAL (CASSANDRA-6939)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8dcc751/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 30d564c..792c962 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -42,6 +42,7 @@ import javax.management.ObjectName;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
+import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ConcurrentHashMultiset;
@@ -240,127 +241,130 @@ public class CompactionManager implements CompactionManagerMBean
}
}
- private abstract static class UnmarkingRunnable implements Runnable
+ private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation) throws ExecutionException, InterruptedException
{
- private final ColumnFamilyStore cfs;
- private final Iterable<SSTableReader> sstables;
-
- private UnmarkingRunnable(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables)
+ Iterable<SSTableReader> compactingSSTables = cfs.markAllCompacting();
+ if (compactingSSTables == null)
{
- this.cfs = cfs;
- this.sstables = sstables;
+ logger.info("Aborting operation on {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name);
+ return AllSSTableOpStatus.ABORTED;
}
-
- protected abstract void runMayThrow() throws IOException;
-
- public final void run()
+ if (Iterables.isEmpty(compactingSSTables))
{
- try
- {
- runMayThrow();
- }
- catch (Exception e)
- {
- throw Throwables.propagate(e);
- }
- finally
+ logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name);
+ return AllSSTableOpStatus.SUCCESSFUL;
+ }
+ try
+ {
+ Iterable<SSTableReader> sstables = operation.filterSSTables(compactingSSTables);
+ List<Future<Object>> futures = new ArrayList<>();
+
+ for (final SSTableReader sstable : sstables)
{
- cfs.getDataTracker().unmarkCompacting(sstables);
+ futures.add(executor.submit(new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ operation.execute(sstable);
+ return this;
+ }
+ }));
}
+
+ for (Future<Object> f : futures)
+ f.get();
+ }
+ finally
+ {
+ cfs.getDataTracker().unmarkCompacting(compactingSSTables);
}
+ return AllSSTableOpStatus.SUCCESSFUL;
+ }
+
+ private static interface OneSSTableOperation
+ {
+ Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input);
+ void execute(SSTableReader input) throws IOException;
}
public enum AllSSTableOpStatus { ABORTED, SUCCESSFUL }
public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted) throws InterruptedException, ExecutionException
{
- final Iterable<SSTableReader> sstables = cfs.markAllCompacting();
- if (sstables == null)
- {
- logger.info("Aborting scrub of {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name);
- return AllSSTableOpStatus.ABORTED;
- }
- if (Iterables.isEmpty(sstables))
+ assert !cfs.isIndex();
+ return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
{
- logger.info("No sstables to scrub for {}.{}", cfs.keyspace.getName(), cfs.name);
- return AllSSTableOpStatus.SUCCESSFUL;
- }
+ @Override
+ public Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input)
+ {
+ return input;
+ }
- Runnable runnable = new UnmarkingRunnable(cfs, sstables)
- {
- protected void runMayThrow() throws IOException
+ @Override
+ public void execute(SSTableReader input) throws IOException
{
- doScrub(cfs, sstables, skipCorrupted);
+ scrubOne(cfs, input, skipCorrupted);
}
- };
- executor.submit(runnable).get();
- return AllSSTableOpStatus.SUCCESSFUL;
+ });
}
public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion) throws InterruptedException, ExecutionException
{
- final Iterable<SSTableReader> sstables = cfs.markAllCompacting();
- if (sstables == null)
- {
- logger.info("Aborting sstable format upgrade of {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name);
- return AllSSTableOpStatus.ABORTED;
- }
- if (Iterables.isEmpty(sstables))
- {
- logger.info("No sstables to upgrade for {}.{}", cfs.keyspace.getName(), cfs.name);
- return AllSSTableOpStatus.SUCCESSFUL;
- }
-
- Runnable runnable = new UnmarkingRunnable(cfs, sstables)
+ return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
{
- protected void runMayThrow() throws IOException
+ @Override
+ public Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input)
{
- for (final SSTableReader sstable : sstables)
+ return Iterables.filter(input, new Predicate<SSTableReader>()
{
- if (excludeCurrentVersion && sstable.descriptor.version.equals(Descriptor.Version.CURRENT))
- continue;
-
- // SSTables are marked by the caller
- // NOTE: it is important that the task create one and only one sstable, even for Leveled compaction (see LeveledManifest.replace())
- AbstractCompactionTask task = cfs.getCompactionStrategy().getCompactionTask(Collections.singleton(sstable), NO_GC, Long.MAX_VALUE);
- task.setUserDefined(true);
- task.setCompactionType(OperationType.UPGRADE_SSTABLES);
- task.execute(metrics);
- }
+ @Override
+ public boolean apply(SSTableReader sstable)
+ {
+ return !(excludeCurrentVersion && sstable.descriptor.version.equals(Descriptor.Version.CURRENT));
+ }
+ });
}
- };
- executor.submit(runnable).get();
- return AllSSTableOpStatus.SUCCESSFUL;
+
+ @Override
+ public void execute(SSTableReader input) throws IOException
+ {
+ AbstractCompactionTask task = cfs.getCompactionStrategy().getCompactionTask(Collections.singleton(input), NO_GC, Long.MAX_VALUE);
+ task.setUserDefined(true);
+ task.setCompactionType(OperationType.UPGRADE_SSTABLES);
+ task.execute(metrics);
+ }
+ });
}
public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
{
- final Iterable<SSTableReader> sstables = cfStore.markAllCompacting();
- if (sstables == null)
+ assert !cfStore.isIndex();
+ Keyspace keyspace = cfStore.keyspace;
+ final Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
+ if (ranges.isEmpty())
{
- logger.info("Aborting cleanup of {}.{} after failing to interrupt other compaction operations", cfStore.keyspace.getName(), cfStore.name);
+ logger.info("Cleanup cannot run before a node has joined the ring");
return AllSSTableOpStatus.ABORTED;
}
- if (Iterables.isEmpty(sstables))
- {
- logger.info("No sstables to cleanup for {}.{}", cfStore.keyspace.getName(), cfStore.name);
- return AllSSTableOpStatus.SUCCESSFUL;
- }
-
- Runnable runnable = new UnmarkingRunnable(cfStore, sstables)
+ final boolean hasIndexes = cfStore.indexManager.hasIndexes();
+ final CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges);
+ return parallelAllSSTableOperation(cfStore, new OneSSTableOperation()
{
- protected void runMayThrow() throws IOException
+ @Override
+ public Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input)
{
- // Sort the column families in order of SSTable size, so cleanup of smaller CFs
- // can free up space for larger ones
- List<SSTableReader> sortedSSTables = Lists.newArrayList(sstables);
+ List<SSTableReader> sortedSSTables = Lists.newArrayList(input);
Collections.sort(sortedSSTables, new SSTableReader.SizeComparator());
+ return sortedSSTables;
+ }
- doCleanupCompaction(cfStore, sortedSSTables);
+ @Override
+ public void execute(SSTableReader input) throws IOException
+ {
+ doCleanupOne(cfStore, input, cleanupStrategy, ranges, hasIndexes);
}
- };
- executor.submit(runnable).get();
- return AllSSTableOpStatus.SUCCESSFUL;
+ });
}
public Future<?> submitAntiCompaction(final ColumnFamilyStore cfs,
@@ -567,20 +571,6 @@ public class CompactionManager implements CompactionManagerMBean
}
}
- /**
- * Deserialize everything in the CFS and re-serialize w/ the newest version. Also attempts to recover
- * from bogus row keys / sizes using data from the index, and skips rows with garbage columns that resulted
- * from early ByteBuffer bugs.
- *
- * @throws IOException
- */
- private void doScrub(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, boolean skipCorrupted) throws IOException
- {
- assert !cfs.isIndex();
- for (final SSTableReader sstable : sstables)
- scrubOne(cfs, sstable, skipCorrupted);
- }
-
private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted) throws IOException
{
Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, false);
@@ -652,108 +642,94 @@ public class CompactionManager implements CompactionManagerMBean
}
/**
- * This function goes over each file and removes the keys that the node is not responsible for
+ * This function goes over a file and removes the keys that the node is not responsible for
* and only keeps keys that this node is responsible for.
*
* @throws IOException
*/
- private void doCleanupCompaction(final ColumnFamilyStore cfs, Collection<SSTableReader> sstables) throws IOException
+ private void doCleanupOne(final ColumnFamilyStore cfs, SSTableReader sstable, CleanupStrategy cleanupStrategy, Collection<Range<Token>> ranges, boolean hasIndexes) throws IOException
{
assert !cfs.isIndex();
- Keyspace keyspace = cfs.keyspace;
- Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
- if (ranges.isEmpty())
+
+ if (!hasIndexes && !new Bounds<>(sstable.first.token, sstable.last.token).intersects(ranges))
{
- logger.info("Cleanup cannot run before a node has joined the ring");
+ cfs.getDataTracker().markCompactedSSTablesReplaced(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
return;
}
-
- boolean hasIndexes = cfs.indexManager.hasIndexes();
- CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges);
-
- for (SSTableReader sstable : sstables)
+ if (!needsCleanup(sstable, ranges))
{
- if (!hasIndexes && !new Bounds<>(sstable.first.token, sstable.last.token).intersects(ranges))
- {
- cfs.getDataTracker().markCompactedSSTablesReplaced(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
- continue;
- }
- if (!needsCleanup(sstable, ranges))
- {
- logger.debug("Skipping {} for cleanup; all rows should be kept", sstable);
- continue;
- }
+ logger.debug("Skipping {} for cleanup; all rows should be kept", sstable);
+ return;
+ }
- CompactionController controller = new CompactionController(cfs, Collections.singleton(sstable), getDefaultGcBefore(cfs));
- long start = System.nanoTime();
+ long start = System.nanoTime();
- long totalkeysWritten = 0;
+ long totalkeysWritten = 0;
- int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(),
- (int) (SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
- if (logger.isDebugEnabled())
- logger.debug("Expected bloom filter size : {}", expectedBloomFilterSize);
+ int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(),
+ (int) (SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
+ if (logger.isDebugEnabled())
+ logger.debug("Expected bloom filter size : {}", expectedBloomFilterSize);
- logger.info("Cleaning up {}", sstable);
+ logger.info("Cleaning up {}", sstable);
- File compactionFileLocation = cfs.directories.getDirectoryForCompactedSSTables();
- if (compactionFileLocation == null)
- throw new IOException("disk full");
+ File compactionFileLocation = cfs.directories.getDirectoryForCompactedSSTables();
+ if (compactionFileLocation == null)
+ throw new IOException("disk full");
- ICompactionScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter());
- CleanupInfo ci = new CleanupInfo(sstable, scanner);
+ ICompactionScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter());
+ CleanupInfo ci = new CleanupInfo(sstable, scanner);
- metrics.beginCompaction(ci);
- SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(ImmutableSet.of(sstable)), sstable.maxDataAge, OperationType.CLEANUP, false);
+ metrics.beginCompaction(ci);
+ SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(ImmutableSet.of(sstable)), sstable.maxDataAge, OperationType.CLEANUP, false);
- try
+ try (CompactionController controller = new CompactionController(cfs, Collections.singleton(sstable), getDefaultGcBefore(cfs)))
+ {
+ writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
+
+ while (scanner.hasNext())
{
- writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
- while (scanner.hasNext())
- {
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
-
- SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
- row = cleanupStrategy.cleanup(row);
- if (row == null)
- continue;
- AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(row));
- if (writer.append(compactedRow) != null)
- totalkeysWritten++;
- }
+ SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+ row = cleanupStrategy.cleanup(row);
+ if (row == null)
+ continue;
+ AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(row));
+ if (writer.append(compactedRow) != null)
+ totalkeysWritten++;
+ }
- // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
- cfs.indexManager.flushIndexesBlocking();
+ // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
+ cfs.indexManager.flushIndexesBlocking();
- writer.finish();
- }
- catch (Throwable e)
- {
- writer.abort();
- throw Throwables.propagate(e);
- }
- finally
- {
- controller.close();
- scanner.close();
- metrics.finishCompaction(ci);
- }
+ writer.finish();
+ }
+ catch (Throwable e)
+ {
+ writer.abort();
+ throw Throwables.propagate(e);
+ }
+ finally
+ {
+ scanner.close();
+ metrics.finishCompaction(ci);
+ }
- List<SSTableReader> results = writer.finished();
- if (!results.isEmpty())
- {
- String format = "Cleaned up to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.";
- long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- long startsize = sstable.onDiskLength();
- long endsize = 0;
- for (SSTableReader newSstable : results)
- endsize += newSstable.onDiskLength();
- double ratio = (double) endsize / (double) startsize;
- logger.info(String.format(format, results.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
- }
+ List<SSTableReader> results = writer.finished();
+ if (!results.isEmpty())
+ {
+ String format = "Cleaned up to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.";
+ long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ long startsize = sstable.onDiskLength();
+ long endsize = 0;
+ for (SSTableReader newSstable : results)
+ endsize += newSstable.onDiskLength();
+ double ratio = (double) endsize / (double) startsize;
+ logger.info(String.format(format, results.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
}
+
}
private static abstract class CleanupStrategy
[2/2] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d0ddff1d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d0ddff1d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d0ddff1d
Branch: refs/heads/trunk
Commit: d0ddff1d3ed44bdd3a902dd208207a810b1d1341
Parents: 7000efa c8dcc75
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Apr 28 09:22:23 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Apr 28 09:22:23 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 328 +++++++++----------
2 files changed, 153 insertions(+), 176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0ddff1d/CHANGES.txt
----------------------------------------------------------------------