You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/04/28 09:22:36 UTC

[1/2] git commit: Multi-threaded scrub/cleanup/upgradesstables

Repository: cassandra
Updated Branches:
  refs/heads/trunk 7000efa21 -> d0ddff1d3


Multi-threaded scrub/cleanup/upgradesstables

Patch by rspitzer and marcuse for CASSANDRA-5547


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c8dcc751
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c8dcc751
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c8dcc751

Branch: refs/heads/trunk
Commit: c8dcc7515e9f1d7f73f2d5e6651862a1b1023bea
Parents: fccc123
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Apr 24 08:57:46 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Apr 28 08:52:00 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        | 328 +++++++++----------
 2 files changed, 153 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8dcc751/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5baaefd..1b67357 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -52,6 +52,7 @@
  * fix cassandra stress errors on reads with native protocol (CASSANDRA-7033)
  * Use OpOrder to guard sstable references for reads (CASSANDRA-6919)
  * Preemptive opening of compaction result (CASSANDRA-6916)
+ * Multi-threaded scrub/cleanup/upgradesstables (CASSANDRA-5547)
 Merged from 2.0:
  * Set JMX RMI port to 7199 (CASSANDRA-7087)
  * Use LOCAL_QUORUM for data reads at LOCAL_SERIAL (CASSANDRA-6939)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c8dcc751/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 30d564c..792c962 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -42,6 +42,7 @@ import javax.management.ObjectName;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
+import com.google.common.base.Predicate;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ConcurrentHashMultiset;
@@ -240,127 +241,130 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
-    private abstract static class UnmarkingRunnable implements Runnable
+    private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation) throws ExecutionException, InterruptedException
     {
-        private final ColumnFamilyStore cfs;
-        private final Iterable<SSTableReader> sstables;
-
-        private UnmarkingRunnable(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables)
+        Iterable<SSTableReader> compactingSSTables = cfs.markAllCompacting();
+        if (compactingSSTables == null)
         {
-            this.cfs = cfs;
-            this.sstables = sstables;
+            logger.info("Aborting operation on {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name);
+            return AllSSTableOpStatus.ABORTED;
         }
-
-        protected abstract void runMayThrow() throws IOException;
-
-        public final void run()
+        if (Iterables.isEmpty(compactingSSTables))
         {
-            try
-            {
-                runMayThrow();
-            }
-            catch (Exception e)
-            {
-                throw Throwables.propagate(e);
-            }
-            finally
+            logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name);
+            return AllSSTableOpStatus.SUCCESSFUL;
+        }
+        try
+        {
+            Iterable<SSTableReader> sstables = operation.filterSSTables(compactingSSTables);
+            List<Future<Object>> futures = new ArrayList<>();
+
+            for (final SSTableReader sstable : sstables)
             {
-                cfs.getDataTracker().unmarkCompacting(sstables);
+                futures.add(executor.submit(new Callable<Object>()
+                {
+                    @Override
+                    public Object call() throws Exception
+                    {
+                        operation.execute(sstable);
+                        return this;
+                    }
+                }));
             }
+
+            for (Future<Object> f : futures)
+                f.get();
+        }
+        finally
+        {
+            cfs.getDataTracker().unmarkCompacting(compactingSSTables);
         }
+        return AllSSTableOpStatus.SUCCESSFUL;
+    }
+
+    private static interface OneSSTableOperation
+    {
+        Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input);
+        void execute(SSTableReader input) throws IOException;
     }
 
     public enum AllSSTableOpStatus { ABORTED, SUCCESSFUL }
 
     public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted) throws InterruptedException, ExecutionException
     {
-        final Iterable<SSTableReader> sstables = cfs.markAllCompacting();
-        if (sstables == null)
-        {
-            logger.info("Aborting scrub of {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name);
-            return AllSSTableOpStatus.ABORTED;
-        }
-        if (Iterables.isEmpty(sstables))
+        assert !cfs.isIndex();
+        return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
         {
-            logger.info("No sstables to scrub for {}.{}", cfs.keyspace.getName(), cfs.name);
-            return AllSSTableOpStatus.SUCCESSFUL;
-        }
+            @Override
+            public Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input)
+            {
+                return input;
+            }
 
-        Runnable runnable = new UnmarkingRunnable(cfs, sstables)
-        {
-            protected void runMayThrow() throws IOException
+            @Override
+            public void execute(SSTableReader input) throws IOException
             {
-                doScrub(cfs, sstables, skipCorrupted);
+                scrubOne(cfs, input, skipCorrupted);
             }
-        };
-        executor.submit(runnable).get();
-        return AllSSTableOpStatus.SUCCESSFUL;
+        });
     }
 
     public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion) throws InterruptedException, ExecutionException
     {
-        final Iterable<SSTableReader> sstables = cfs.markAllCompacting();
-        if (sstables == null)
-        {
-            logger.info("Aborting sstable format upgrade of {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name);
-            return AllSSTableOpStatus.ABORTED;
-        }
-        if (Iterables.isEmpty(sstables))
-        {
-            logger.info("No sstables to upgrade for {}.{}", cfs.keyspace.getName(), cfs.name);
-            return AllSSTableOpStatus.SUCCESSFUL;
-        }
-
-        Runnable runnable = new UnmarkingRunnable(cfs, sstables)
+        return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
         {
-            protected void runMayThrow() throws IOException
+            @Override
+            public Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input)
             {
-                for (final SSTableReader sstable : sstables)
+                return Iterables.filter(input, new Predicate<SSTableReader>()
                 {
-                    if (excludeCurrentVersion && sstable.descriptor.version.equals(Descriptor.Version.CURRENT))
-                        continue;
-
-                    // SSTables are marked by the caller
-                    // NOTE: it is important that the task create one and only one sstable, even for Leveled compaction (see LeveledManifest.replace())
-                    AbstractCompactionTask task = cfs.getCompactionStrategy().getCompactionTask(Collections.singleton(sstable), NO_GC, Long.MAX_VALUE);
-                    task.setUserDefined(true);
-                    task.setCompactionType(OperationType.UPGRADE_SSTABLES);
-                    task.execute(metrics);
-                }
+                    @Override
+                    public boolean apply(SSTableReader sstable)
+                    {
+                        return !(excludeCurrentVersion && sstable.descriptor.version.equals(Descriptor.Version.CURRENT));
+                    }
+                });
             }
-        };
-        executor.submit(runnable).get();
-        return AllSSTableOpStatus.SUCCESSFUL;
+
+            @Override
+            public void execute(SSTableReader input) throws IOException
+            {
+                AbstractCompactionTask task = cfs.getCompactionStrategy().getCompactionTask(Collections.singleton(input), NO_GC, Long.MAX_VALUE);
+                task.setUserDefined(true);
+                task.setCompactionType(OperationType.UPGRADE_SSTABLES);
+                task.execute(metrics);
+            }
+        });
     }
 
     public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
     {
-        final Iterable<SSTableReader> sstables = cfStore.markAllCompacting();
-        if (sstables == null)
+        assert !cfStore.isIndex();
+        Keyspace keyspace = cfStore.keyspace;
+        final Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
+        if (ranges.isEmpty())
         {
-            logger.info("Aborting cleanup of {}.{} after failing to interrupt other compaction operations", cfStore.keyspace.getName(), cfStore.name);
+            logger.info("Cleanup cannot run before a node has joined the ring");
             return AllSSTableOpStatus.ABORTED;
         }
-        if (Iterables.isEmpty(sstables))
-        {
-            logger.info("No sstables to cleanup for {}.{}", cfStore.keyspace.getName(), cfStore.name);
-            return AllSSTableOpStatus.SUCCESSFUL;
-        }
-
-        Runnable runnable = new UnmarkingRunnable(cfStore, sstables)
+        final boolean hasIndexes = cfStore.indexManager.hasIndexes();
+        final CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges);
+        return parallelAllSSTableOperation(cfStore, new OneSSTableOperation()
         {
-            protected void runMayThrow() throws IOException
+            @Override
+            public Iterable<SSTableReader> filterSSTables(Iterable<SSTableReader> input)
             {
-                // Sort the column families in order of SSTable size, so cleanup of smaller CFs
-                // can free up space for larger ones
-                List<SSTableReader> sortedSSTables = Lists.newArrayList(sstables);
+                List<SSTableReader> sortedSSTables = Lists.newArrayList(input);
                 Collections.sort(sortedSSTables, new SSTableReader.SizeComparator());
+                return sortedSSTables;
+            }
 
-                doCleanupCompaction(cfStore, sortedSSTables);
+            @Override
+            public void execute(SSTableReader input) throws IOException
+            {
+                doCleanupOne(cfStore, input, cleanupStrategy, ranges, hasIndexes);
             }
-        };
-        executor.submit(runnable).get();
-        return AllSSTableOpStatus.SUCCESSFUL;
+        });
     }
 
     public Future<?> submitAntiCompaction(final ColumnFamilyStore cfs,
@@ -567,20 +571,6 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
-    /**
-     * Deserialize everything in the CFS and re-serialize w/ the newest version.  Also attempts to recover
-     * from bogus row keys / sizes using data from the index, and skips rows with garbage columns that resulted
-     * from early ByteBuffer bugs.
-     *
-     * @throws IOException
-     */
-    private void doScrub(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables, boolean skipCorrupted) throws IOException
-    {
-        assert !cfs.isIndex();
-        for (final SSTableReader sstable : sstables)
-            scrubOne(cfs, sstable, skipCorrupted);
-    }
-
     private void scrubOne(ColumnFamilyStore cfs, SSTableReader sstable, boolean skipCorrupted) throws IOException
     {
         Scrubber scrubber = new Scrubber(cfs, sstable, skipCorrupted, false);
@@ -652,108 +642,94 @@ public class CompactionManager implements CompactionManagerMBean
     }
 
     /**
-     * This function goes over each file and removes the keys that the node is not responsible for
+     * This function goes over a file and removes the keys that the node is not responsible for
      * and only keeps keys that this node is responsible for.
      *
      * @throws IOException
      */
-    private void doCleanupCompaction(final ColumnFamilyStore cfs, Collection<SSTableReader> sstables) throws IOException
+    private void doCleanupOne(final ColumnFamilyStore cfs, SSTableReader sstable, CleanupStrategy cleanupStrategy, Collection<Range<Token>> ranges, boolean hasIndexes) throws IOException
     {
         assert !cfs.isIndex();
-        Keyspace keyspace = cfs.keyspace;
-        Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
-        if (ranges.isEmpty())
+
+        if (!hasIndexes && !new Bounds<>(sstable.first.token, sstable.last.token).intersects(ranges))
         {
-            logger.info("Cleanup cannot run before a node has joined the ring");
+            cfs.getDataTracker().markCompactedSSTablesReplaced(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
             return;
         }
-
-        boolean hasIndexes = cfs.indexManager.hasIndexes();
-        CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, ranges);
-
-        for (SSTableReader sstable : sstables)
+        if (!needsCleanup(sstable, ranges))
         {
-            if (!hasIndexes && !new Bounds<>(sstable.first.token, sstable.last.token).intersects(ranges))
-            {
-                cfs.getDataTracker().markCompactedSSTablesReplaced(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
-                continue;
-            }
-            if (!needsCleanup(sstable, ranges))
-            {
-                logger.debug("Skipping {} for cleanup; all rows should be kept", sstable);
-                continue;
-            }
+            logger.debug("Skipping {} for cleanup; all rows should be kept", sstable);
+            return;
+        }
 
-            CompactionController controller = new CompactionController(cfs, Collections.singleton(sstable), getDefaultGcBefore(cfs));
-            long start = System.nanoTime();
+        long start = System.nanoTime();
 
-            long totalkeysWritten = 0;
+        long totalkeysWritten = 0;
 
-            int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(),
-                                                   (int) (SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
-            if (logger.isDebugEnabled())
-                logger.debug("Expected bloom filter size : {}", expectedBloomFilterSize);
+        int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(),
+                                               (int) (SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
+        if (logger.isDebugEnabled())
+            logger.debug("Expected bloom filter size : {}", expectedBloomFilterSize);
 
-            logger.info("Cleaning up {}", sstable);
+        logger.info("Cleaning up {}", sstable);
 
-            File compactionFileLocation = cfs.directories.getDirectoryForCompactedSSTables();
-            if (compactionFileLocation == null)
-                throw new IOException("disk full");
+        File compactionFileLocation = cfs.directories.getDirectoryForCompactedSSTables();
+        if (compactionFileLocation == null)
+            throw new IOException("disk full");
 
-            ICompactionScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter());
-            CleanupInfo ci = new CleanupInfo(sstable, scanner);
+        ICompactionScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter());
+        CleanupInfo ci = new CleanupInfo(sstable, scanner);
 
-            metrics.beginCompaction(ci);
-            SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(ImmutableSet.of(sstable)), sstable.maxDataAge, OperationType.CLEANUP, false);
+        metrics.beginCompaction(ci);
+        SSTableRewriter writer = new SSTableRewriter(cfs, new HashSet<>(ImmutableSet.of(sstable)), sstable.maxDataAge, OperationType.CLEANUP, false);
 
-            try
+        try (CompactionController controller = new CompactionController(cfs, Collections.singleton(sstable), getDefaultGcBefore(cfs)))
+        {
+            writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
+
+            while (scanner.hasNext())
             {
-                writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
+                if (ci.isStopRequested())
+                    throw new CompactionInterruptedException(ci.getCompactionInfo());
 
-                while (scanner.hasNext())
-                {
-                    if (ci.isStopRequested())
-                        throw new CompactionInterruptedException(ci.getCompactionInfo());
-
-                    SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
-                    row = cleanupStrategy.cleanup(row);
-                    if (row == null)
-                        continue;
-                    AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(row));
-                    if (writer.append(compactedRow) != null)
-                        totalkeysWritten++;
-                }
+                SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+                row = cleanupStrategy.cleanup(row);
+                if (row == null)
+                    continue;
+                AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(row));
+                if (writer.append(compactedRow) != null)
+                    totalkeysWritten++;
+            }
 
-                // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
-                cfs.indexManager.flushIndexesBlocking();
+            // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
+            cfs.indexManager.flushIndexesBlocking();
 
-                writer.finish();
-            }
-            catch (Throwable e)
-            {
-                writer.abort();
-                throw Throwables.propagate(e);
-            }
-            finally
-            {
-                controller.close();
-                scanner.close();
-                metrics.finishCompaction(ci);
-            }
+            writer.finish();
+        }
+        catch (Throwable e)
+        {
+            writer.abort();
+            throw Throwables.propagate(e);
+        }
+        finally
+        {
+            scanner.close();
+            metrics.finishCompaction(ci);
+        }
 
-            List<SSTableReader> results = writer.finished();
-            if (!results.isEmpty())
-            {
-                String format = "Cleaned up to %s.  %,d to %,d (~%d%% of original) bytes for %,d keys.  Time: %,dms.";
-                long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-                long startsize = sstable.onDiskLength();
-                long endsize = 0;
-                for (SSTableReader newSstable : results)
-                    endsize += newSstable.onDiskLength();
-                double ratio = (double) endsize / (double) startsize;
-                logger.info(String.format(format, results.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
-            }
+        List<SSTableReader> results = writer.finished();
+        if (!results.isEmpty())
+        {
+            String format = "Cleaned up to %s.  %,d to %,d (~%d%% of original) bytes for %,d keys.  Time: %,dms.";
+            long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+            long startsize = sstable.onDiskLength();
+            long endsize = 0;
+            for (SSTableReader newSstable : results)
+                endsize += newSstable.onDiskLength();
+            double ratio = (double) endsize / (double) startsize;
+            logger.info(String.format(format, results.get(0).getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
         }
+
     }
 
     private static abstract class CleanupStrategy


[2/2] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d0ddff1d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d0ddff1d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d0ddff1d

Branch: refs/heads/trunk
Commit: d0ddff1d3ed44bdd3a902dd208207a810b1d1341
Parents: 7000efa c8dcc75
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Apr 28 09:22:23 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Apr 28 09:22:23 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        | 328 +++++++++----------
 2 files changed, 153 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d0ddff1d/CHANGES.txt
----------------------------------------------------------------------