You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/03/08 09:48:09 UTC

[2/4] git commit: Don't promote sstables for cleanup, scrub and updateSSTables

Don't promote sstables for cleanup, scrub and updateSSTables

patch by slebresne; reviewed by jbellis for CASSANDRA-3989


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65059cf4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65059cf4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65059cf4

Branch: refs/heads/trunk
Commit: 65059cf48e0794e0459b1882961616f55382c756
Parents: 09f091a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Mar 7 18:35:15 2012 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Mar 8 09:46:09 2012 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   11 +++--
 src/java/org/apache/cassandra/db/DataTracker.java  |   15 +++---
 .../db/compaction/AbstractCompactionTask.java      |    8 +++
 .../cassandra/db/compaction/CompactionManager.java |    8 ++-
 .../cassandra/db/compaction/CompactionTask.java    |    8 ++--
 .../db/compaction/LeveledCompactionStrategy.java   |   13 +++++-
 .../cassandra/db/compaction/LeveledManifest.java   |   35 +++++++++++---
 .../cassandra/db/compaction/OperationType.java     |    1 +
 .../SSTableListChangedNotification.java            |    7 +++-
 .../cassandra/db/compaction/CompactionsTest.java   |    3 +-
 11 files changed, 81 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16f8f0c..bf8d9fc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -45,6 +45,8 @@ Merged from 1.0:
  * Pig: Composite column support (CASSANDRA-384)
  * Avoid NPE during repair when a keyspace has no CFs (CASSANDRA-3988)
  * Fix division-by-zero error on get_slice (CASSANDRA-4000)
+ * don't change manifest level for cleanup, scrub, and upgradesstables
+   operations under LeveledCompactionStrategy (CASSANDRA-3989)
 
 
 1.1-beta1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 2872d4f..48cdb8c 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -51,6 +51,7 @@ import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
 import org.apache.cassandra.db.filter.ExtendedFilter;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
@@ -937,15 +938,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this);
     }
 
-    public void markCompacted(Collection<SSTableReader> sstables)
+    public void markCompacted(Collection<SSTableReader> sstables, OperationType compactionType)
     {
         assert !sstables.isEmpty();
-        data.markCompacted(sstables);
+        data.markCompacted(sstables, compactionType);
     }
 
-    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements)
+    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements, OperationType compactionType)
     {
-        data.replaceCompactedSSTables(sstables, replacements);
+        data.replaceCompactedSSTables(sstables, replacements, compactionType);
     }
 
     void replaceFlushed(Memtable memtable, SSTableReader sstable)
@@ -1966,6 +1967,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
 
         if (!truncatedSSTables.isEmpty())
-            markCompacted(truncatedSSTables);
+            markCompacted(truncatedSSTables, OperationType.UNKNOWN);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index d346eab..13c3369 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.notifications.INotification;
 import org.apache.cassandra.notifications.INotificationConsumer;
@@ -237,16 +238,16 @@ public class DataTracker
         while (!view.compareAndSet(currentView, newView));
     }
 
-    public void markCompacted(Collection<SSTableReader> sstables)
+    public void markCompacted(Collection<SSTableReader> sstables, OperationType compactionType)
     {
         replace(sstables, Collections.<SSTableReader>emptyList());
-        notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList());
+        notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), compactionType);
     }
 
-    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements)
+    public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements, OperationType compactionType)
     {
         replace(sstables, replacements);
-        notifySSTablesChanged(sstables, replacements);
+        notifySSTablesChanged(sstables, replacements, compactionType);
     }
 
     public void addInitialSSTables(Collection<SSTableReader> sstables)
@@ -286,7 +287,7 @@ public class DataTracker
             // notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion"
             return;
         }
-        notifySSTablesChanged(notCompacting, Collections.<SSTableReader>emptySet());
+        notifySSTablesChanged(notCompacting, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
         postReplace(notCompacting, Collections.<SSTableReader>emptySet());
     }
 
@@ -518,11 +519,11 @@ public class DataTracker
         return (double) falseCount / (trueCount + falseCount);
     }
 
-    public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added)
+    public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added, OperationType compactionType)
     {
         for (INotificationConsumer subscriber : subscribers)
         {
-            INotification notification = new SSTableListChangedNotification(added, removed);
+            INotification notification = new SSTableListChangedNotification(added, removed, compactionType);
             subscriber.handleNotification(notification, this);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index e031e07..9826941 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -30,12 +30,14 @@ public abstract class AbstractCompactionTask
     protected final ColumnFamilyStore cfs;
     protected Collection<SSTableReader> sstables;
     protected boolean isUserDefined;
+    protected OperationType compactionType;
 
     public AbstractCompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables)
     {
         this.cfs = cfs;
         this.sstables = sstables;
         this.isUserDefined = false;
+        this.compactionType = OperationType.COMPACTION;
     }
 
     public abstract int execute(CompactionExecutorStatsCollector collector) throws IOException;
@@ -93,4 +95,10 @@ public abstract class AbstractCompactionTask
         this.isUserDefined = isUserDefined;
         return this;
     }
+
+    public AbstractCompactionTask setCompactionType(OperationType compactionType)
+    {
+        this.compactionType = compactionType;
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 7a36625..590c3d7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -211,8 +211,10 @@ public class CompactionManager implements CompactionManagerMBean
                 for (final SSTableReader sstable : sstables)
                 {
                     // SSTables are marked by the caller
+                    // NOTE: it is important that the task create one and only one sstable, even for Leveled compaction (see LeveledManifest.replace())
                     CompactionTask task = new CompactionTask(cfs, Collections.singletonList(sstable), Integer.MAX_VALUE);
                     task.isUserDefined(true);
+                    task.setCompactionType(OperationType.UPGRADE_SSTABLES);
                     task.execute(executor);
                 }
             }
@@ -624,7 +626,7 @@ public class CompactionManager implements CompactionManagerMBean
 
         if (newSstable == null)
         {
-            cfs.markCompacted(Arrays.asList(sstable));
+            cfs.markCompacted(Arrays.asList(sstable), OperationType.SCRUB);
             if (badRows > 0)
                 logger.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
             else
@@ -632,7 +634,7 @@ public class CompactionManager implements CompactionManagerMBean
         }
         else
         {
-            cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable));
+            cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable), OperationType.SCRUB);
             logger.info("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
             if (badRows > 0)
                 logger.warn("Unable to recover " + badRows + " rows that were skipped.  You can attempt manual recovery from the pre-scrub snapshot.  You can also run nodetool repair to transfer the data from a healthy replica, if any");
@@ -777,7 +779,7 @@ public class CompactionManager implements CompactionManagerMBean
             // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
             cfs.indexManager.flushIndexesBlocking();
 
-            cfs.replaceCompactedSSTables(Arrays.asList(sstable), results);
+            cfs.replaceCompactedSSTables(Arrays.asList(sstable), results, OperationType.CLEANUP);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 4d77af0..6961490 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -121,8 +121,8 @@ public class CompactionTask extends AbstractCompactionTask
             logger.debug("Expected bloom filter size : " + keysPerSSTable);
 
         AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
-                                      ? new ParallelCompactionIterable(OperationType.COMPACTION, toCompact, controller)
-                                      : new CompactionIterable(OperationType.COMPACTION, toCompact, controller);
+                                      ? new ParallelCompactionIterable(compactionType, toCompact, controller)
+                                      : new CompactionIterable(compactionType, toCompact, controller);
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
         Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
         Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
@@ -143,7 +143,7 @@ public class CompactionTask extends AbstractCompactionTask
                 // don't mark compacted in the finally block, since if there _is_ nondeleted data,
                 // we need to sync it (via closeAndOpen) first, so there is no period during which
                 // a crash could cause data loss.
-                cfs.markCompacted(toCompact);
+                cfs.markCompacted(toCompact, compactionType);
                 return 0;
             }
 
@@ -199,7 +199,7 @@ public class CompactionTask extends AbstractCompactionTask
                 collector.finishCompaction(ci);
         }
 
-        cfs.replaceCompactedSSTables(toCompact, sstables);
+        cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
         // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
         for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 3bbbeb6..0d5bdc7 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -132,7 +132,18 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
         else if (notification instanceof SSTableListChangedNotification)
         {
             SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
-            manifest.promote(listChangedNotification.removed, listChangedNotification.added);
+            switch (listChangedNotification.compactionType)
+            {
+                // Cleanup, scrub and updateSSTable shouldn't promote (see #3989)
+                case CLEANUP:
+                case SCRUB:
+                case UPGRADE_SSTABLES:
+                    manifest.replace(listChangedNotification.removed, listChangedNotification.added);
+                    break;
+                default:
+                    manifest.promote(listChangedNotification.removed, listChangedNotification.added);
+                    break;
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 3c02cda..bbb41a5 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -60,7 +60,6 @@ public class LeveledManifest
     private final Map<SSTableReader, Integer> sstableGenerations;
     private final RowPosition[] lastCompactedKeys;
     private final int maxSSTableSizeInMB;
-    private int levelCount;
 
     private LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB)
     {
@@ -165,11 +164,10 @@ public class LeveledManifest
         int maximumLevel = 0;
         for (SSTableReader sstable : removed)
         {
-            int thisLevel = levelOf(sstable);
+            int thisLevel = remove(sstable);
             assert thisLevel >= 0;
             maximumLevel = Math.max(maximumLevel, thisLevel);
             minimumLevel = Math.min(minimumLevel, thisLevel);
-            remove(sstable);
         }
 
         // it's valid to do a remove w/o an add (e.g. on truncate)
@@ -189,6 +187,22 @@ public class LeveledManifest
         serialize();
     }
 
+    public synchronized void replace(Iterable<SSTableReader> removed, Iterable<SSTableReader> added)
+    {
+        // replace is for compaction operation that don't really change the
+        // content of a sstable (cleanup, scrub) and much replace one sstable by another
+        assert Iterables.size(removed) == 1;
+        assert Iterables.size(added) == 1;
+        SSTableReader toRemove = removed.iterator().next();
+        SSTableReader toAdd = added.iterator().next();
+        logDistribution();
+        if (logger.isDebugEnabled())
+            logger.debug("Replacing " + removed + " by " + toAdd);
+
+        add(toAdd, remove(toRemove));
+        serialize();
+    }
+
     private String toString(Iterable<SSTableReader> sstables)
     {
         StringBuilder builder = new StringBuilder();
@@ -291,12 +305,15 @@ public class LeveledManifest
 
     private void logDistribution()
     {
-        for (int i = 0; i < generations.length; i++)
+        if (logger.isDebugEnabled())
         {
-            if (!generations[i].isEmpty())
+            for (int i = 0; i < generations.length; i++)
             {
-                logger.debug("L{} contains {} SSTables ({} bytes) in {}",
-                             new Object[] {i, generations[i].size(), SSTableReader.getTotalBytes(generations[i]), this});
+                if (!generations[i].isEmpty())
+                {
+                    logger.debug("L{} contains {} SSTables ({} bytes) in {}",
+                            new Object[] {i, generations[i].size(), SSTableReader.getTotalBytes(generations[i]), this});
+                }
             }
         }
     }
@@ -310,16 +327,18 @@ public class LeveledManifest
         return level.intValue();
     }
 
-    private void remove(SSTableReader reader)
+    private int remove(SSTableReader reader)
     {
         int level = levelOf(reader);
         assert level >= 0 : reader + " not present in manifest";
         generations[level].remove(reader);
         sstableGenerations.remove(reader);
+        return level;
     }
 
     private void add(SSTableReader sstable, int level)
     {
+        assert level < generations.length : "Invalid level " + level + " out of " + (generations.length - 1);
         generations[level].add(sstable);
         sstableGenerations.put(sstable, Integer.valueOf(level));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 75e3d06..79f6c5e 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -25,6 +25,7 @@ public enum OperationType
     ROW_CACHE_SAVE("Row cache save"),
     CLEANUP("Cleanup"),
     SCRUB("Scrub"),
+    UPGRADE_SSTABLES("Upgrade sstables"),
     INDEX_BUILD("Secondary index build"),
     UNKNOWN("Unkown compaction type");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java b/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
index 770fd48..ca7ead9 100644
--- a/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
+++ b/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
@@ -19,13 +19,18 @@ package org.apache.cassandra.notifications;
 
 import org.apache.cassandra.io.sstable.SSTableReader;
 
+import org.apache.cassandra.db.compaction.OperationType;
+
 public class SSTableListChangedNotification implements INotification
 {
     public final Iterable<SSTableReader> removed;
     public final Iterable<SSTableReader> added;
-    public SSTableListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed)
+    public final OperationType compactionType;
+
+    public SSTableListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed, OperationType compactionType)
     {
         this.removed = removed;
         this.added = added;
+        this.compactionType = compactionType;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index a9c66bf..bd49c23 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.io.sstable.*;
@@ -273,7 +274,7 @@ public class CompactionsTest extends CleanupHelper
         assertEquals(2, store.getSSTables().size());
 
         // Now, we remove the sstable that was just created to force the use of EchoedRow (so that it doesn't hide the problem)
-        store.markCompacted(Collections.singleton(tmpSSTable));
+        store.markCompacted(Collections.singleton(tmpSSTable), OperationType.UNKNOWN);
         assertEquals(1, store.getSSTables().size());
 
         // Now assert we do have the 4 keys