You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/06/19 16:26:08 UTC

cassandra git commit: Bump SSTable level instead of rewriting SSTable completely during single-sstable compactions

Repository: cassandra
Updated Branches:
  refs/heads/trunk d52bdaefd -> 53c0ef171


Bump SSTable level instead of rewriting SSTable completely during single-sstable compactions

Patch by Marcus Eriksson, reviewed by Alex Petrov for CASSANDRA-12526


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/53c0ef17
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/53c0ef17
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/53c0ef17

Branch: refs/heads/trunk
Commit: 53c0ef171424454c47d64a9326b0ba83cd743a50
Parents: d52bdae
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Apr 16 15:55:11 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Tue Jun 19 18:25:43 2018 +0200

----------------------------------------------------------------------
 .../compaction/AbstractCompactionStrategy.java  |  14 ++
 .../compaction/CompactionStrategyManager.java   |  12 ++
 .../compaction/LeveledCompactionStrategy.java   |  28 +++-
 .../db/compaction/LeveledManifest.java          |  17 +-
 .../db/compaction/SingleSSTableLCSTask.java     | 101 ++++++++++++
 .../apache/cassandra/db/lifecycle/Tracker.java  |   9 ++
 .../notifications/SSTableMetadataChanged.java   |  33 ++++
 .../db/compaction/SingleSSTableLCSTaskTest.java | 155 +++++++++++++++++++
 .../cassandra/db/lifecycle/TrackerTest.java     |   4 +
 .../cassandra/io/sstable/LegacySSTableTest.java |  29 +++-
 10 files changed, 393 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index a43761f..3410f13 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.schema.CompactionParams;
 
 /**
@@ -286,6 +287,19 @@ public abstract class AbstractCompactionStrategy
     @VisibleForTesting
     protected abstract Set<SSTableReader> getSSTables();
 
+    /**
+     * Called when the metadata has changed for an sstable - for example if the level changed
+     *
+     * Not called when repair status changes (which is also metadata), because this results in the
+     * sstable getting removed from the compaction strategy instance.
+     *
+     * @param oldMetadata
+     * @param sstable
+     */
+    public void metadataChanged(StatsMetadata oldMetadata, SSTableReader sstable)
+    {
+    }
+
     public static class ScannerList implements AutoCloseable
     {
         public final List<ISSTableScanner> scanners;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index b954d5e..81b7c7e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.notifications.*;
 import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.schema.TableMetadata;
@@ -770,6 +771,12 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
     }
 
+    private void handleMetadataChangedNotification(SSTableReader sstable, StatsMetadata oldMetadata)
+    {
+        AbstractCompactionStrategy acs = getCompactionStrategyFor(sstable);
+        acs.metadataChanged(oldMetadata, sstable);
+    }
+
     private void handleDeletingNotification(SSTableReader deleted)
     {
         // If reloaded, SSTables will be placed in their correct locations
@@ -806,6 +813,11 @@ public class CompactionStrategyManager implements INotificationConsumer
         {
             handleDeletingNotification(((SSTableDeletingNotification) notification).deleting);
         }
+        else if (notification instanceof SSTableMetadataChanged)
+        {
+            SSTableMetadataChanged lcNotification = (SSTableMetadataChanged) notification;
+            handleMetadataChangedNotification(lcNotification.sstable, lcNotification.oldMetadata);
+        }
     }
 
     public void enable()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index b1091ce..a65a20e 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.Config;
@@ -49,18 +50,21 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
     private static final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
     private static final boolean tolerateSstableSize = Boolean.getBoolean(Config.PROPERTY_PREFIX + "tolerate_sstable_size");
     private static final String LEVEL_FANOUT_SIZE_OPTION = "fanout_size";
+    private static final String SINGLE_SSTABLE_UPLEVEL_OPTION = "single_sstable_uplevel";
     public static final int DEFAULT_LEVEL_FANOUT_SIZE = 10;
 
     @VisibleForTesting
     final LeveledManifest manifest;
     private final int maxSSTableSizeInMB;
     private final int levelFanoutSize;
+    private final boolean singleSSTableUplevel;
 
     public LeveledCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
     {
         super(cfs, options);
         int configuredMaxSSTableSize = 160;
         int configuredLevelFanoutSize = DEFAULT_LEVEL_FANOUT_SIZE;
+        boolean configuredSingleSSTableUplevel = false;
         SizeTieredCompactionStrategyOptions localOptions = new SizeTieredCompactionStrategyOptions(options);
         if (options != null)
         {
@@ -82,9 +86,15 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
             {
                 configuredLevelFanoutSize = Integer.parseInt(options.get(LEVEL_FANOUT_SIZE_OPTION));
             }
+
+            if (options.containsKey(SINGLE_SSTABLE_UPLEVEL_OPTION))
+            {
+                configuredSingleSSTableUplevel = Boolean.parseBoolean(options.get(SINGLE_SSTABLE_UPLEVEL_OPTION));
+            }
         }
         maxSSTableSizeInMB = configuredMaxSSTableSize;
         levelFanoutSize = configuredLevelFanoutSize;
+        singleSSTableUplevel = configuredSingleSSTableUplevel;
 
         manifest = new LeveledManifest(cfs, this.maxSSTableSizeInMB, this.levelFanoutSize, localOptions);
         logger.trace("Created {}", manifest);
@@ -151,7 +161,12 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
             LifecycleTransaction txn = cfs.getTracker().tryModify(candidate.sstables, OperationType.COMPACTION);
             if (txn != null)
             {
-                LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, txn, candidate.level, gcBefore, candidate.maxSSTableBytes, false);
+                AbstractCompactionTask newTask;
+                if (!singleSSTableUplevel || op == OperationType.TOMBSTONE_COMPACTION || txn.originals().size() > 1)
+                    newTask = new LeveledCompactionTask(cfs, txn, candidate.level, gcBefore, candidate.maxSSTableBytes, false);
+                else
+                    newTask = new SingleSSTableLCSTask(cfs, txn, candidate.level);
+
                 newTask.setCompactionType(op);
                 return newTask;
             }
@@ -333,6 +348,13 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
     }
 
     @Override
+    public void metadataChanged(StatsMetadata oldMetadata, SSTableReader sstable)
+    {
+        if (sstable.getSSTableLevel() != oldMetadata.sstableLevel)
+            manifest.newLevel(sstable, oldMetadata.sstableLevel);
+    }
+
+    @Override
     public void addSSTable(SSTableReader added)
     {
         manifest.add(added);
@@ -569,6 +591,10 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
         }
 
         uncheckedOptions.remove(LEVEL_FANOUT_SIZE_OPTION);
+        uncheckedOptions.remove(SINGLE_SSTABLE_UPLEVEL_OPTION);
+
+        uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString());
+        uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString());
 
         uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString());
         uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 291973f..c01dd69 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -49,11 +49,6 @@ public class LeveledManifest
     private static final Logger logger = LoggerFactory.getLogger(LeveledManifest.class);
 
     /**
-     * if we have more than MAX_COMPACTING_L0 sstables in L0, we will run a round of STCS with at most
-     * cfs.getMaxCompactionThreshold() sstables.
-     */
-    private static final int MAX_COMPACTING_L0 = 32;
-    /**
      * If we go this many rounds without compacting
      * in the highest level, we start bringing in sstables from
      * that level into lower level compactions
@@ -420,7 +415,7 @@ public class LeveledManifest
 
     private CompactionCandidate getSTCSInL0CompactionCandidate()
     {
-        if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > MAX_COMPACTING_L0)
+        if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > cfs.getMaximumCompactionThreshold())
         {
             List<SSTableReader> mostInteresting = getSSTablesForSTCS(getLevel(0));
             if (!mostInteresting.isEmpty())
@@ -818,7 +813,7 @@ public class LeveledManifest
             tasks += estimated[i];
         }
 
-        if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > MAX_COMPACTING_L0)
+        if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > cfs.getMaximumCompactionThreshold())
         {
             int l0compactions = getLevel(0).size() / cfs.getMaximumCompactionThreshold();
             tasks += l0compactions;
@@ -864,6 +859,14 @@ public class LeveledManifest
         return sstables;
     }
 
+    public synchronized void newLevel(SSTableReader sstable, int oldLevel)
+    {
+        boolean removed = generations[oldLevel].remove(sstable);
+        assert removed : "Could not remove " + sstable +" from " + oldLevel;
+        add(sstable);
+        lastCompactedKeys[oldLevel] = sstable.last;
+    }
+
     public static class CompactionCandidate
     {
         public final Collection<SSTableReader> sstables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java b/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
new file mode 100644
index 0000000..3522d61
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+
+/**
+ * Special compaction task that does not do any compaction, instead it
+ * just mutates the level metadata on the sstable and notifies the compaction
+ * strategy.
+ */
+public class SingleSSTableLCSTask extends AbstractCompactionTask
+{
+    private static final Logger logger = LoggerFactory.getLogger(SingleSSTableLCSTask.class);
+
+    private final int level;
+
+    public SingleSSTableLCSTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int level)
+    {
+        super(cfs, txn);
+        assert txn.originals().size() == 1;
+        this.level = level;
+    }
+
+    @Override
+    public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+    {
+        throw new UnsupportedOperationException("This method should never be called on SingleSSTableLCSTask");
+    }
+
+    @Override
+    protected int executeInternal(CompactionManager.CompactionExecutorStatsCollector collector)
+    {
+        run();
+        return 1;
+    }
+
+    @Override
+    protected void runMayThrow()
+    {
+        SSTableReader sstable = transaction.onlyOne();
+        StatsMetadata metadataBefore = sstable.getSSTableMetadata();
+        if (level == metadataBefore.sstableLevel)
+        {
+            logger.info("Not compacting {}, level is already {}", sstable, level);
+        }
+        else
+        {
+            try
+            {
+                logger.info("Changing level on {} from {} to {}", sstable, metadataBefore.sstableLevel, level);
+                sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, level);
+                sstable.reloadSSTableMetadata();
+            }
+            catch (Throwable t)
+            {
+                transaction.abort();
+                throw new CorruptSSTableException(t, sstable.descriptor.filenameFor(Component.DATA));
+            }
+            cfs.getTracker().notifySSTableMetadataChanged(sstable, metadataBefore);
+        }
+        finishTransaction(sstable);
+    }
+
+    private void finishTransaction(SSTableReader sstable)
+    {
+        // we simply cancel the transaction since no sstables are added or removed - we just
+        // write a new sstable metadata above and then atomically move the new file on top of the old
+        transaction.cancel(sstable);
+        transaction.prepareToCommit();
+        transaction.commit();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index c578da2..3ae6eaf 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.notifications.*;
@@ -446,6 +447,14 @@ public class Tracker
             subscriber.handleNotification(notification, this);
     }
 
+    public void notifySSTableMetadataChanged(SSTableReader levelChanged, StatsMetadata oldMetadata)
+    {
+        INotification notification = new SSTableMetadataChanged(levelChanged, oldMetadata);
+        for (INotificationConsumer subscriber : subscribers)
+            subscriber.handleNotification(notification, this);
+
+    }
+
     public void notifyDeleting(SSTableReader deleting)
     {
         INotification notification = new SSTableDeletingNotification(deleting);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/notifications/SSTableMetadataChanged.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/notifications/SSTableMetadataChanged.java b/src/java/org/apache/cassandra/notifications/SSTableMetadataChanged.java
new file mode 100644
index 0000000..83cfe60
--- /dev/null
+++ b/src/java/org/apache/cassandra/notifications/SSTableMetadataChanged.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.notifications;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+
+public class SSTableMetadataChanged implements INotification
+{
+    public final SSTableReader sstable;
+    public final StatsMetadata oldMetadata;
+
+    public SSTableMetadataChanged(SSTableReader levelChanged, StatsMetadata oldMetadata)
+    {
+        this.sstable = levelChanged;
+        this.oldMetadata = oldMetadata;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
new file mode 100644
index 0000000..6428ab7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SingleSSTableLCSTaskTest extends CQLTester
+{
+    @Test
+    public void basicTest() throws Throwable
+    {
+        createTable("create table %s (id int primary key, t text) with compaction = {'class':'LeveledCompactionStrategy','single_sstable_uplevel':true}");
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        execute("insert into %s (id, t) values (1, 'meep')");
+        cfs.forceBlockingFlush();
+        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstable, OperationType.COMPACTION))
+        {
+            if (txn != null)
+            {
+                SingleSSTableLCSTask task = new SingleSSTableLCSTask(cfs, txn, 2);
+                task.executeInternal(null);
+            }
+        }
+        assertEquals(1, cfs.getLiveSSTables().size());
+        cfs.getLiveSSTables().forEach(s -> assertEquals(2, s.getSSTableLevel()));
+        // make sure compaction strategy is notified:
+        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().iterator().next();
+        for (int i = 0; i < lcs.manifest.getLevelCount(); i++)
+        {
+            if (i == 2)
+                assertEquals(1, lcs.getLevelSize(i));
+            else
+                assertEquals(0, lcs.getLevelSize(i));
+        }
+        assertTrue(cfs.getTracker().getCompacting().isEmpty());
+    }
+
+    @Test
+    public void compactionTest() throws Throwable
+    {
+        compactionTestHelper(true);
+    }
+
+    @Test
+    public void uplevelDisabledTest() throws Throwable
+    {
+        compactionTestHelper(false);
+    }
+
+    private void compactionTestHelper(boolean singleSSTUplevel) throws Throwable
+    {
+        createTable("create table %s (id int, id2 int, t blob, primary key (id, id2))" +
+                    "with compaction = {'class':'LeveledCompactionStrategy', 'single_sstable_uplevel':" + singleSSTUplevel + ", 'sstable_size_in_mb':'1', 'max_threshold':'1000'}");
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        cfs.disableAutoCompaction();
+        byte[] b = new byte[10 * 1024];
+        new Random().nextBytes(b);
+        ByteBuffer value = ByteBuffer.wrap(b);
+        for (int i = 0; i < 5000; i++)
+        {
+            for (int j = 0; j < 10; j++)
+            {
+                execute("insert into %s (id, id2, t) values (?, ?, ?)", i, j, value);
+            }
+            if (i % 100 == 0)
+                cfs.forceBlockingFlush();
+        }
+        // now we have a bunch of data in L0, first compaction will be a normal one, containing all sstables:
+        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().get(0);
+        AbstractCompactionTask act = lcs.getNextBackgroundTask(0);
+        act.execute(null);
+
+        // now all sstables are laid out non-overlapping in L1, this means that the rest of the compactions
+        // will be single sstable ones, make sure that we use SingleSSTableLCSTask if singleSSTUplevel is true:
+        while (lcs.getEstimatedRemainingTasks() > 0)
+        {
+            act = lcs.getNextBackgroundTask(0);
+            assertEquals(singleSSTUplevel, act instanceof SingleSSTableLCSTask);
+            act.execute(null);
+        }
+        assertEquals(0, lcs.getLevelSize(0));
+        int l1size = lcs.getLevelSize(1);
+        // this should be 10, but it might vary a bit depending on partition sizes etc
+        assertTrue(l1size >= 8 && l1size <= 12);
+        assertTrue(lcs.getLevelSize(2) > 0);
+    }
+
+    @Test
+    public void corruptMetadataTest() throws Throwable
+    {
+        createTable("create table %s (id int primary key, t text) with compaction = {'class':'LeveledCompactionStrategy','single_sstable_uplevel':true}");
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        execute("insert into %s (id, t) values (1, 'meep')");
+        cfs.forceBlockingFlush();
+        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+
+        String filenameToCorrupt = sstable.descriptor.filenameFor(Component.STATS);
+        RandomAccessFile file = new RandomAccessFile(filenameToCorrupt, "rw");
+        file.seek(0);
+        file.writeBytes(StringUtils.repeat('z', 2));
+        file.close();
+        boolean gotException = false;
+        try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstable, OperationType.COMPACTION))
+        {
+            if (txn != null)
+            {
+                SingleSSTableLCSTask task = new SingleSSTableLCSTask(cfs, txn, 2);
+                task.executeInternal(null);
+            }
+        }
+        catch (Throwable t)
+        {
+            gotException = true;
+        }
+        assertTrue(gotException);
+        assertEquals(1, cfs.getLiveSSTables().size());
+        for (SSTableReader sst : cfs.getLiveSSTables())
+            assertEquals(0, sst.getSSTableMetadata().sstableLevel);
+        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().iterator().next();
+        assertEquals(1, lcs.getLevelSize(0));
+        assertTrue(cfs.getTracker().getCompacting().isEmpty());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index bee322c..2891126 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -359,6 +359,10 @@ public class TrackerTest
         tracker.notifyRenewed(memtable);
         Assert.assertEquals(memtable, ((MemtableRenewedNotification) listener.received.get(0)).renewed);
         listener.received.clear();
+        tracker.notifySSTableMetadataChanged(r1, r1.getSSTableMetadata());
+        Assert.assertEquals(((SSTableMetadataChanged)listener.received.get(0)).sstable, r1);
+        Assert.assertEquals(r1.getSSTableMetadata(), ((SSTableMetadataChanged)listener.received.get(0)).oldMetadata);
+        listener.received.clear();
         tracker.unsubscribe(listener);
         MockListener failListener = new MockListener(true);
         tracker.subscribe(failListener);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 13d3eac..fcc9191 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -172,8 +172,35 @@ public class LegacySSTableTest
             {
                 for (SSTableReader sstable : cfs.getLiveSSTables())
                 {
-                    sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1234, UUID.randomUUID());
+                    UUID random = UUID.randomUUID();
+                    sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1234, random);
                     sstable.reloadSSTableMetadata();
+                    assertEquals(1234, sstable.getRepairedAt());
+                    if (sstable.descriptor.version.hasPendingRepair())
+                        assertEquals(random, sstable.getPendingRepair());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testMutateLevel() throws Exception
+    {
+        // we need to make sure we write old version metadata in the format for that version
+        for (String legacyVersion : legacyVersions)
+        {
+            logger.info("Loading legacy version: {}", legacyVersion);
+            truncateLegacyTables(legacyVersion);
+            loadLegacyTables(legacyVersion);
+            CacheService.instance.invalidateKeyCache();
+
+            for (ColumnFamilyStore cfs : Keyspace.open("legacy_tables").getColumnFamilyStores())
+            {
+                for (SSTableReader sstable : cfs.getLiveSSTables())
+                {
+                    sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 1234);
+                    sstable.reloadSSTableMetadata();
+                    assertEquals(1234, sstable.getSSTableLevel());
                 }
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org