You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/06/17 17:00:21 UTC

svn commit: r1136904 - in /cassandra/trunk/src/java/org/apache/cassandra/db: DataTracker.java compaction/AbstractCompactionTask.java compaction/CompactionManager.java compaction/CompactionTask.java

Author: slebresne
Date: Fri Jun 17 15:00:21 2011
New Revision: 1136904

URL: http://svn.apache.org/viewvc?rev=1136904&view=rev
Log:
Fix compaction of the same sstable by multiple thread
patch by slebresne; reviewed by jbellis for CASSANDRA-2769

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1136904&r1=1136903&r2=1136904&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Fri Jun 17 15:00:21 2011
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.compaction.AbstractCompactionTask;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.Pair;
@@ -163,6 +162,9 @@ public class    DataTracker
     {
         if (max < min || max < 1)
             return null;
+        if (tomark == null || tomark.isEmpty())
+            return null;
+
         View currentView, newView;
         Set<SSTableReader> subset = null;
         // order preserving set copy of the input
@@ -190,41 +192,6 @@ public class    DataTracker
         return subset;
     }
 
-    public boolean markCompacting(AbstractCompactionTask task)
-    {
-        ColumnFamilyStore cfs = task.getColumnFamilyStore();
-        return markCompacting(task, cfs.getMinimumCompactionThreshold(), cfs.getMaximumCompactionThreshold());
-    }
-
-    public boolean markCompacting(AbstractCompactionTask task, int min, int max)
-    {
-        Collection<SSTableReader> sstablesToMark = task.getSSTables();
-        if (sstablesToMark == null || sstablesToMark.isEmpty())
-            return false;
-
-        if (max < min || max < 1)
-            return false;
-
-        View currentView, newView;
-        // order preserving set copy of the input
-        Set<SSTableReader> remaining = new LinkedHashSet<SSTableReader>(sstablesToMark);
-        do
-        {
-            currentView = view.get();
-
-            // find the subset that is active and not already compacting
-            remaining.removeAll(currentView.compacting);
-            remaining.retainAll(currentView.sstables);
-            if (remaining.size() < min || remaining.size() > max)
-                // cannot meet the min and max threshold
-                return false;
-
-            newView = currentView.markCompacting(sstablesToMark);
-        }
-        while (!view.compareAndSet(currentView, newView));
-        return true;
-    }
-
     /**
      * Removes files from compacting status: this is different from 'markCompacted'
      * because it should be run regardless of whether a compaction succeeded.
@@ -240,11 +207,6 @@ public class    DataTracker
         while (!view.compareAndSet(currentView, newView));
     }
 
-    public void unmarkCompacting(AbstractCompactionTask task)
-    {
-        unmarkCompacting(task.getSSTables());
-    }
-
     public void markCompacted(Collection<SSTableReader> sstables)
     {
         replace(sstables, Collections.<SSTableReader>emptyList());

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java?rev=1136904&r1=1136903&r2=1136904&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java Fri Jun 17 15:00:21 2011
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db.compaction;
 
 import java.util.Collection;
+import java.util.Set;
 import java.io.IOException;
 
 import org.apache.cassandra.io.sstable.SSTableReader;
@@ -47,4 +48,33 @@ public abstract class AbstractCompaction
     {
         return sstables;
     }
+
+    /**
+     * Try to mark the sstable to compact as compacting.
+     * It returns true if some sstables have been marked for compaction, false
+     * otherwise.
+     * This *must* be called before calling execute(). Moreover,
+     * unmarkSSTables *must* always be called after execute() if this
+     * method returns true.
+     */
+    public boolean markSSTablesForCompaction()
+    {
+        return markSSTablesForCompaction(cfs.getMinimumCompactionThreshold(), cfs.getMaximumCompactionThreshold());
+    }
+
+    public boolean markSSTablesForCompaction(int min, int max)
+    {
+        Set<SSTableReader> marked = cfs.getDataTracker().markCompacting(sstables, min, max);
+
+        if (marked == null || marked.isEmpty())
+            return false;
+
+        this.sstables = marked;
+        return true;
+    }
+
+    public void unmarkSSTables()
+    {
+        cfs.getDataTracker().unmarkCompacting(sstables);
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1136904&r1=1136903&r2=1136904&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Fri Jun 17 15:00:21 2011
@@ -110,7 +110,7 @@ public class CompactionManager implement
                     AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
                     for (AbstractCompactionTask task : strategy.getBackgroundTasks(getDefaultGcBefore(cfs)))
                     {
-                        if (!cfs.getDataTracker().markCompacting(task))
+                        if (!task.markSSTablesForCompaction())
                             continue;
 
                         try
@@ -119,7 +119,7 @@ public class CompactionManager implement
                         }
                         finally
                         {
-                            cfs.getDataTracker().unmarkCompacting(task);
+                            task.unmarkSSTables();
                         }
                     }
                 }
@@ -246,7 +246,7 @@ public class CompactionManager implement
                     AbstractCompactionStrategy strategy = cfStore.getCompactionStrategy();
                     for (AbstractCompactionTask task : strategy.getMaximalTasks(gcBefore))
                     {
-                        if (!cfStore.getDataTracker().markCompacting(task, 0, Integer.MAX_VALUE))
+                        if (!task.markSSTablesForCompaction(0, Integer.MAX_VALUE))
                             return this;
                         try
                         {
@@ -264,7 +264,7 @@ public class CompactionManager implement
                         }
                         finally
                         {
-                            cfStore.getDataTracker().unmarkCompacting(task);
+                            task.unmarkSSTables();
                         }
                     }
                 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1136904&r1=1136903&r2=1136904&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Fri Jun 17 15:00:21 2011
@@ -60,12 +60,18 @@ public class CompactionTask extends Abst
     /**
      * For internal use and testing only.  The rest of the system should go through the submit* methods,
      * which are properly serialized.
+     * Caller is in charge of marking/unmarking the sstables as compacting.
      */
     public int execute(CompactionExecutorStatsCollector collector) throws IOException
     {
+        // The collection of sstables passed may be empty (but not null); even if
+        // it is not empty, it may compact down to nothing if all rows are deleted.
+        assert sstables != null;
+
+        Set<SSTableReader> toCompact = new HashSet<SSTableReader>(sstables);
         if (!isUserDefined)
         {
-            if (sstables.size() < 2)
+            if (toCompact.size() < 2)
             {
                 logger.info("Nothing to compact in " + cfs.getColumnFamilyName() + "." +
                             "Use forceUserDefinedCompaction if you wish to force compaction of single sstables " +
@@ -74,18 +80,19 @@ public class CompactionTask extends Abst
             }
 
             if (compactionFileLocation == null)
-                compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(sstables));
+                compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
 
             // If the compaction file path is null that means we have no space left for this compaction.
             // Try again w/o the largest one.
             if (compactionFileLocation == null)
             {
-                Set<SSTableReader> smallerSSTables = new HashSet<SSTableReader>(sstables);
-                while (compactionFileLocation == null && smallerSSTables.size() > 1)
+                while (compactionFileLocation == null && toCompact.size() > 1)
                 {
-                    logger.warn("insufficient space to compact all requested files " + StringUtils.join(smallerSSTables, ", "));
-                    smallerSSTables.remove(cfs.getMaxSizeFile(smallerSSTables));
-                    compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(smallerSSTables));
+                    logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", "));
+                    // Note that we have removed files that are still marked as compacting. This suboptimal but ok since the caller will unmark all
+                    // the sstables at the end.
+                    toCompact.remove(cfs.getMaxSizeFile(toCompact));
+                    compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(toCompact));
                 }
             }
 
@@ -96,36 +103,32 @@ public class CompactionTask extends Abst
             }
         }
 
-        // The collection of sstables passed may be empty (but not null); even if
-        // it is not empty, it may compact down to nothing if all rows are deleted.
-        assert sstables != null;
-
         if (DatabaseDescriptor.isSnapshotBeforeCompaction())
             cfs.table.snapshot(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily);
 
         // sanity check: all sstables must belong to the same cfs
-        for (SSTableReader sstable : sstables)
+        for (SSTableReader sstable : toCompact)
             assert sstable.descriptor.cfname.equals(cfs.columnFamily);
 
-        CompactionController controller = new CompactionController(cfs, sstables, gcBefore, isUserDefined);
+        CompactionController controller = new CompactionController(cfs, toCompact, gcBefore, isUserDefined);
         // new sstables from flush can be added during a compaction, but only the compaction can remove them,
         // so in our single-threaded compaction world this is a valid way of determining if we're compacting
         // all the sstables (that existed when we started)
         CompactionType type = controller.isMajor()
                             ? CompactionType.MAJOR
                             : CompactionType.MINOR;
-        logger.info("Compacting {}: {}", type, sstables);
+        logger.info("Compacting {}: {}", type, toCompact);
 
         long startTime = System.currentTimeMillis();
         long totalkeysWritten = 0;
 
         // TODO the int cast here is potentially buggy
-        int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)SSTableReader.getApproximateKeyCount(sstables));
+        int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)SSTableReader.getApproximateKeyCount(toCompact));
         if (logger.isDebugEnabled())
             logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
         SSTableWriter writer;
-        CompactionIterator ci = new CompactionIterator(type, sstables, controller); // retain a handle so we can call close()
+        CompactionIterator ci = new CompactionIterator(type, toCompact, controller); // retain a handle so we can call close()
         Iterator<AbstractCompactedRow> nni = Iterators.filter(ci, Predicates.notNull());
         Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
 
@@ -138,11 +141,11 @@ public class CompactionTask extends Abst
                 // don't mark compacted in the finally block, since if there _is_ nondeleted data,
                 // we need to sync it (via closeAndOpen) first, so there is no period during which
                 // a crash could cause data loss.
-                cfs.markCompacted(sstables);
+                cfs.markCompacted(toCompact);
                 return 0;
             }
 
-            writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, sstables);
+            writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, toCompact);
             while (nni.hasNext())
             {
                 AbstractCompactedRow row = nni.next();
@@ -151,7 +154,7 @@ public class CompactionTask extends Abst
 
                 if (DatabaseDescriptor.getPreheatKeyCache())
                 {
-                    for (SSTableReader sstable : sstables)
+                    for (SSTableReader sstable : toCompact)
                     {
                         if (sstable.getCachedPosition(row.key) != null)
                         {
@@ -169,19 +172,19 @@ public class CompactionTask extends Abst
                 collector.finishCompaction(ci);
         }
 
-        SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(sstables));
-        cfs.replaceCompactedSSTables(sstables, Arrays.asList(ssTable));
+        SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(toCompact));
+        cfs.replaceCompactedSSTables(toCompact, Arrays.asList(ssTable));
         for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet()) // empty if preheat is off
             ssTable.cacheKey(entry.getKey(), entry.getValue());
         CompactionManager.instance.submitBackground(cfs);
 
         long dTime = System.currentTimeMillis() - startTime;
-        long startsize = SSTable.getTotalBytes(sstables);
+        long startsize = SSTable.getTotalBytes(toCompact);
         long endsize = ssTable.length();
         double ratio = (double)endsize / (double)startsize;
         logger.info(String.format("Compacted to %s.  %,d to %,d (~%d%% of original) bytes for %,d keys.  Time: %,dms.",
-                writer.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
-        return sstables.size();
+                ssTable.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
+        return toCompact.size();
     }
 
     public static long getMaxDataAge(Collection<SSTableReader> sstables)