You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2018/06/19 16:26:08 UTC
cassandra git commit: Bump SSTable level instead of rewriting SSTable
completely during single-sstable compactions
Repository: cassandra
Updated Branches:
refs/heads/trunk d52bdaefd -> 53c0ef171
Bump SSTable level instead of rewriting SSTable completely during single-sstable compactions
Patch by Marcus Eriksson, reviewed by Alex Petrov for CASSANDRA-12526
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/53c0ef17
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/53c0ef17
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/53c0ef17
Branch: refs/heads/trunk
Commit: 53c0ef171424454c47d64a9326b0ba83cd743a50
Parents: d52bdae
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Apr 16 15:55:11 2018 +0200
Committer: Alex Petrov <ol...@gmail.com>
Committed: Tue Jun 19 18:25:43 2018 +0200
----------------------------------------------------------------------
.../compaction/AbstractCompactionStrategy.java | 14 ++
.../compaction/CompactionStrategyManager.java | 12 ++
.../compaction/LeveledCompactionStrategy.java | 28 +++-
.../db/compaction/LeveledManifest.java | 17 +-
.../db/compaction/SingleSSTableLCSTask.java | 101 ++++++++++++
.../apache/cassandra/db/lifecycle/Tracker.java | 9 ++
.../notifications/SSTableMetadataChanged.java | 33 ++++
.../db/compaction/SingleSSTableLCSTaskTest.java | 155 +++++++++++++++++++
.../cassandra/db/lifecycle/TrackerTest.java | 4 +
.../cassandra/io/sstable/LegacySSTableTest.java | 29 +++-
10 files changed, 393 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index a43761f..3410f13 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.schema.CompactionParams;
/**
@@ -286,6 +287,19 @@ public abstract class AbstractCompactionStrategy
@VisibleForTesting
protected abstract Set<SSTableReader> getSSTables();
+ /**
+ * Called when the metadata has changed for an sstable - for example if the level changed
+ *
+ * Not called when repair status changes (which is also metadata), because this results in the
+ * sstable getting removed from the compaction strategy instance.
+ *
+ * @param oldMetadata
+ * @param sstable
+ */
+ public void metadataChanged(StatsMetadata oldMetadata, SSTableReader sstable)
+ {
+ }
+
public static class ScannerList implements AutoCloseable
{
public final List<ISSTableScanner> scanners;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index b954d5e..81b7c7e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.notifications.*;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.schema.TableMetadata;
@@ -770,6 +771,12 @@ public class CompactionStrategyManager implements INotificationConsumer
}
}
+ private void handleMetadataChangedNotification(SSTableReader sstable, StatsMetadata oldMetadata)
+ {
+ AbstractCompactionStrategy acs = getCompactionStrategyFor(sstable);
+ acs.metadataChanged(oldMetadata, sstable);
+ }
+
private void handleDeletingNotification(SSTableReader deleted)
{
// If reloaded, SSTables will be placed in their correct locations
@@ -806,6 +813,11 @@ public class CompactionStrategyManager implements INotificationConsumer
{
handleDeletingNotification(((SSTableDeletingNotification) notification).deleting);
}
+ else if (notification instanceof SSTableMetadataChanged)
+ {
+ SSTableMetadataChanged lcNotification = (SSTableMetadataChanged) notification;
+ handleMetadataChangedNotification(lcNotification.sstable, lcNotification.oldMetadata);
+ }
}
public void enable()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index b1091ce..a65a20e 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.Config;
@@ -49,18 +50,21 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
private static final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
private static final boolean tolerateSstableSize = Boolean.getBoolean(Config.PROPERTY_PREFIX + "tolerate_sstable_size");
private static final String LEVEL_FANOUT_SIZE_OPTION = "fanout_size";
+ private static final String SINGLE_SSTABLE_UPLEVEL_OPTION = "single_sstable_uplevel";
public static final int DEFAULT_LEVEL_FANOUT_SIZE = 10;
@VisibleForTesting
final LeveledManifest manifest;
private final int maxSSTableSizeInMB;
private final int levelFanoutSize;
+ private final boolean singleSSTableUplevel;
public LeveledCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
{
super(cfs, options);
int configuredMaxSSTableSize = 160;
int configuredLevelFanoutSize = DEFAULT_LEVEL_FANOUT_SIZE;
+ boolean configuredSingleSSTableUplevel = false;
SizeTieredCompactionStrategyOptions localOptions = new SizeTieredCompactionStrategyOptions(options);
if (options != null)
{
@@ -82,9 +86,15 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
{
configuredLevelFanoutSize = Integer.parseInt(options.get(LEVEL_FANOUT_SIZE_OPTION));
}
+
+ if (options.containsKey(SINGLE_SSTABLE_UPLEVEL_OPTION))
+ {
+ configuredSingleSSTableUplevel = Boolean.parseBoolean(options.get(SINGLE_SSTABLE_UPLEVEL_OPTION));
+ }
}
maxSSTableSizeInMB = configuredMaxSSTableSize;
levelFanoutSize = configuredLevelFanoutSize;
+ singleSSTableUplevel = configuredSingleSSTableUplevel;
manifest = new LeveledManifest(cfs, this.maxSSTableSizeInMB, this.levelFanoutSize, localOptions);
logger.trace("Created {}", manifest);
@@ -151,7 +161,12 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
LifecycleTransaction txn = cfs.getTracker().tryModify(candidate.sstables, OperationType.COMPACTION);
if (txn != null)
{
- LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, txn, candidate.level, gcBefore, candidate.maxSSTableBytes, false);
+ AbstractCompactionTask newTask;
+ if (!singleSSTableUplevel || op == OperationType.TOMBSTONE_COMPACTION || txn.originals().size() > 1)
+ newTask = new LeveledCompactionTask(cfs, txn, candidate.level, gcBefore, candidate.maxSSTableBytes, false);
+ else
+ newTask = new SingleSSTableLCSTask(cfs, txn, candidate.level);
+
newTask.setCompactionType(op);
return newTask;
}
@@ -333,6 +348,13 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
}
@Override
+ public void metadataChanged(StatsMetadata oldMetadata, SSTableReader sstable)
+ {
+ if (sstable.getSSTableLevel() != oldMetadata.sstableLevel)
+ manifest.newLevel(sstable, oldMetadata.sstableLevel);
+ }
+
+ @Override
public void addSSTable(SSTableReader added)
{
manifest.add(added);
@@ -569,6 +591,10 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
}
uncheckedOptions.remove(LEVEL_FANOUT_SIZE_OPTION);
+ uncheckedOptions.remove(SINGLE_SSTABLE_UPLEVEL_OPTION);
+
+ uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString());
+ uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString());
uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString());
uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 291973f..c01dd69 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -49,11 +49,6 @@ public class LeveledManifest
private static final Logger logger = LoggerFactory.getLogger(LeveledManifest.class);
/**
- * if we have more than MAX_COMPACTING_L0 sstables in L0, we will run a round of STCS with at most
- * cfs.getMaxCompactionThreshold() sstables.
- */
- private static final int MAX_COMPACTING_L0 = 32;
- /**
* If we go this many rounds without compacting
* in the highest level, we start bringing in sstables from
* that level into lower level compactions
@@ -420,7 +415,7 @@ public class LeveledManifest
private CompactionCandidate getSTCSInL0CompactionCandidate()
{
- if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > MAX_COMPACTING_L0)
+ if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > cfs.getMaximumCompactionThreshold())
{
List<SSTableReader> mostInteresting = getSSTablesForSTCS(getLevel(0));
if (!mostInteresting.isEmpty())
@@ -818,7 +813,7 @@ public class LeveledManifest
tasks += estimated[i];
}
- if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > MAX_COMPACTING_L0)
+ if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > cfs.getMaximumCompactionThreshold())
{
int l0compactions = getLevel(0).size() / cfs.getMaximumCompactionThreshold();
tasks += l0compactions;
@@ -864,6 +859,14 @@ public class LeveledManifest
return sstables;
}
+ public synchronized void newLevel(SSTableReader sstable, int oldLevel)
+ {
+ boolean removed = generations[oldLevel].remove(sstable);
+ assert removed : "Could not remove " + sstable +" from " + oldLevel;
+ add(sstable);
+ lastCompactedKeys[oldLevel] = sstable.last;
+ }
+
public static class CompactionCandidate
{
public final Collection<SSTableReader> sstables;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java b/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
new file mode 100644
index 0000000..3522d61
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/SingleSSTableLCSTask.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+
+/**
+ * Special compaction task that does not do any compaction, instead it
+ * just mutates the level metadata on the sstable and notifies the compaction
+ * strategy.
+ */
+public class SingleSSTableLCSTask extends AbstractCompactionTask
+{
+ private static final Logger logger = LoggerFactory.getLogger(SingleSSTableLCSTask.class);
+
+ private final int level;
+
+ public SingleSSTableLCSTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int level)
+ {
+ super(cfs, txn);
+ assert txn.originals().size() == 1;
+ this.level = level;
+ }
+
+ @Override
+ public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+ {
+ throw new UnsupportedOperationException("This method should never be called on SingleSSTableLCSTask");
+ }
+
+ @Override
+ protected int executeInternal(CompactionManager.CompactionExecutorStatsCollector collector)
+ {
+ run();
+ return 1;
+ }
+
+ @Override
+ protected void runMayThrow()
+ {
+ SSTableReader sstable = transaction.onlyOne();
+ StatsMetadata metadataBefore = sstable.getSSTableMetadata();
+ if (level == metadataBefore.sstableLevel)
+ {
+ logger.info("Not compacting {}, level is already {}", sstable, level);
+ }
+ else
+ {
+ try
+ {
+ logger.info("Changing level on {} from {} to {}", sstable, metadataBefore.sstableLevel, level);
+ sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, level);
+ sstable.reloadSSTableMetadata();
+ }
+ catch (Throwable t)
+ {
+ transaction.abort();
+ throw new CorruptSSTableException(t, sstable.descriptor.filenameFor(Component.DATA));
+ }
+ cfs.getTracker().notifySSTableMetadataChanged(sstable, metadataBefore);
+ }
+ finishTransaction(sstable);
+ }
+
+ private void finishTransaction(SSTableReader sstable)
+ {
+ // we simply cancel the transaction since no sstables are added or removed - we just
+ // write a new sstable metadata above and then atomically move the new file on top of the old
+ transaction.cancel(sstable);
+ transaction.prepareToCommit();
+ transaction.commit();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index c578da2..3ae6eaf 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.notifications.*;
@@ -446,6 +447,14 @@ public class Tracker
subscriber.handleNotification(notification, this);
}
+ public void notifySSTableMetadataChanged(SSTableReader levelChanged, StatsMetadata oldMetadata)
+ {
+ INotification notification = new SSTableMetadataChanged(levelChanged, oldMetadata);
+ for (INotificationConsumer subscriber : subscribers)
+ subscriber.handleNotification(notification, this);
+
+ }
+
public void notifyDeleting(SSTableReader deleting)
{
INotification notification = new SSTableDeletingNotification(deleting);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/src/java/org/apache/cassandra/notifications/SSTableMetadataChanged.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/notifications/SSTableMetadataChanged.java b/src/java/org/apache/cassandra/notifications/SSTableMetadataChanged.java
new file mode 100644
index 0000000..83cfe60
--- /dev/null
+++ b/src/java/org/apache/cassandra/notifications/SSTableMetadataChanged.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.notifications;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+
+public class SSTableMetadataChanged implements INotification
+{
+ public final SSTableReader sstable;
+ public final StatsMetadata oldMetadata;
+
+ public SSTableMetadataChanged(SSTableReader levelChanged, StatsMetadata oldMetadata)
+ {
+ this.sstable = levelChanged;
+ this.oldMetadata = oldMetadata;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
new file mode 100644
index 0000000..6428ab7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/SingleSSTableLCSTaskTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SingleSSTableLCSTaskTest extends CQLTester
+{
+ @Test
+ public void basicTest() throws Throwable
+ {
+ createTable("create table %s (id int primary key, t text) with compaction = {'class':'LeveledCompactionStrategy','single_sstable_uplevel':true}");
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ execute("insert into %s (id, t) values (1, 'meep')");
+ cfs.forceBlockingFlush();
+ SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstable, OperationType.COMPACTION))
+ {
+ if (txn != null)
+ {
+ SingleSSTableLCSTask task = new SingleSSTableLCSTask(cfs, txn, 2);
+ task.executeInternal(null);
+ }
+ }
+ assertEquals(1, cfs.getLiveSSTables().size());
+ cfs.getLiveSSTables().forEach(s -> assertEquals(2, s.getSSTableLevel()));
+ // make sure compaction strategy is notified:
+ LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().iterator().next();
+ for (int i = 0; i < lcs.manifest.getLevelCount(); i++)
+ {
+ if (i == 2)
+ assertEquals(1, lcs.getLevelSize(i));
+ else
+ assertEquals(0, lcs.getLevelSize(i));
+ }
+ assertTrue(cfs.getTracker().getCompacting().isEmpty());
+ }
+
+ @Test
+ public void compactionTest() throws Throwable
+ {
+ compactionTestHelper(true);
+ }
+
+ @Test
+ public void uplevelDisabledTest() throws Throwable
+ {
+ compactionTestHelper(false);
+ }
+
+ private void compactionTestHelper(boolean singleSSTUplevel) throws Throwable
+ {
+ createTable("create table %s (id int, id2 int, t blob, primary key (id, id2))" +
+ "with compaction = {'class':'LeveledCompactionStrategy', 'single_sstable_uplevel':" + singleSSTUplevel + ", 'sstable_size_in_mb':'1', 'max_threshold':'1000'}");
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ cfs.disableAutoCompaction();
+ byte[] b = new byte[10 * 1024];
+ new Random().nextBytes(b);
+ ByteBuffer value = ByteBuffer.wrap(b);
+ for (int i = 0; i < 5000; i++)
+ {
+ for (int j = 0; j < 10; j++)
+ {
+ execute("insert into %s (id, id2, t) values (?, ?, ?)", i, j, value);
+ }
+ if (i % 100 == 0)
+ cfs.forceBlockingFlush();
+ }
+ // now we have a bunch of data in L0, first compaction will be a normal one, containing all sstables:
+ LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().get(0);
+ AbstractCompactionTask act = lcs.getNextBackgroundTask(0);
+ act.execute(null);
+
+ // now all sstables are laid out non-overlapping in L1, this means that the rest of the compactions
+ // will be single sstable ones, make sure that we use SingleSSTableLCSTask if singleSSTUplevel is true:
+ while (lcs.getEstimatedRemainingTasks() > 0)
+ {
+ act = lcs.getNextBackgroundTask(0);
+ assertEquals(singleSSTUplevel, act instanceof SingleSSTableLCSTask);
+ act.execute(null);
+ }
+ assertEquals(0, lcs.getLevelSize(0));
+ int l1size = lcs.getLevelSize(1);
+ // this should be 10, but it might vary a bit depending on partition sizes etc
+ assertTrue(l1size >= 8 && l1size <= 12);
+ assertTrue(lcs.getLevelSize(2) > 0);
+ }
+
+ @Test
+ public void corruptMetadataTest() throws Throwable
+ {
+ createTable("create table %s (id int primary key, t text) with compaction = {'class':'LeveledCompactionStrategy','single_sstable_uplevel':true}");
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ execute("insert into %s (id, t) values (1, 'meep')");
+ cfs.forceBlockingFlush();
+ SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
+
+ String filenameToCorrupt = sstable.descriptor.filenameFor(Component.STATS);
+ RandomAccessFile file = new RandomAccessFile(filenameToCorrupt, "rw");
+ file.seek(0);
+ file.writeBytes(StringUtils.repeat('z', 2));
+ file.close();
+ boolean gotException = false;
+ try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstable, OperationType.COMPACTION))
+ {
+ if (txn != null)
+ {
+ SingleSSTableLCSTask task = new SingleSSTableLCSTask(cfs, txn, 2);
+ task.executeInternal(null);
+ }
+ }
+ catch (Throwable t)
+ {
+ gotException = true;
+ }
+ assertTrue(gotException);
+ assertEquals(1, cfs.getLiveSSTables().size());
+ for (SSTableReader sst : cfs.getLiveSSTables())
+ assertEquals(0, sst.getSSTableMetadata().sstableLevel);
+ LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getUnrepaired().iterator().next();
+ assertEquals(1, lcs.getLevelSize(0));
+ assertTrue(cfs.getTracker().getCompacting().isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index bee322c..2891126 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -359,6 +359,10 @@ public class TrackerTest
tracker.notifyRenewed(memtable);
Assert.assertEquals(memtable, ((MemtableRenewedNotification) listener.received.get(0)).renewed);
listener.received.clear();
+ tracker.notifySSTableMetadataChanged(r1, r1.getSSTableMetadata());
+ Assert.assertEquals(((SSTableMetadataChanged)listener.received.get(0)).sstable, r1);
+ Assert.assertEquals(r1.getSSTableMetadata(), ((SSTableMetadataChanged)listener.received.get(0)).oldMetadata);
+ listener.received.clear();
tracker.unsubscribe(listener);
MockListener failListener = new MockListener(true);
tracker.subscribe(failListener);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53c0ef17/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 13d3eac..fcc9191 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -172,8 +172,35 @@ public class LegacySSTableTest
{
for (SSTableReader sstable : cfs.getLiveSSTables())
{
- sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1234, UUID.randomUUID());
+ UUID random = UUID.randomUUID();
+ sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1234, random);
sstable.reloadSSTableMetadata();
+ assertEquals(1234, sstable.getRepairedAt());
+ if (sstable.descriptor.version.hasPendingRepair())
+ assertEquals(random, sstable.getPendingRepair());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testMutateLevel() throws Exception
+ {
+ // we need to make sure we write old version metadata in the format for that version
+ for (String legacyVersion : legacyVersions)
+ {
+ logger.info("Loading legacy version: {}", legacyVersion);
+ truncateLegacyTables(legacyVersion);
+ loadLegacyTables(legacyVersion);
+ CacheService.instance.invalidateKeyCache();
+
+ for (ColumnFamilyStore cfs : Keyspace.open("legacy_tables").getColumnFamilyStores())
+ {
+ for (SSTableReader sstable : cfs.getLiveSSTables())
+ {
+ sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 1234);
+ sstable.reloadSSTableMetadata();
+ assertEquals(1234, sstable.getSSTableLevel());
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org