You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/05/09 06:22:01 UTC
cassandra git commit: Automatic sstable upgrades
Repository: cassandra
Updated Branches:
refs/heads/trunk 6b0247576 -> d14a9266c
Automatic sstable upgrades
Patch by marcuse; reviewed by Ariel Weisberg for CASSANDRA-14197
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d14a9266
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d14a9266
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d14a9266
Branch: refs/heads/trunk
Commit: d14a9266c7ddff0589fdbe7a1836217b8bb8b394
Parents: 6b02475
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Mar 15 09:25:23 2018 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed May 9 08:17:33 2018 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 3 +
conf/cassandra.yaml | 6 +
.../org/apache/cassandra/config/Config.java | 2 +
.../cassandra/config/DatabaseDescriptor.java | 35 +++++
.../db/compaction/CompactionManager.java | 76 ++++++++++-
.../db/compaction/CompactionManagerMBean.java | 21 +++
.../compaction/CompactionStrategyManager.java | 35 ++++-
.../apache/cassandra/metrics/TableMetrics.java | 13 ++
.../org/apache/cassandra/tools/NodeProbe.java | 1 +
.../tools/nodetool/stats/StatsTable.java | 1 +
.../tools/nodetool/stats/TableStatsHolder.java | 1 +
.../tools/nodetool/stats/TableStatsPrinter.java | 1 +
.../CompactionStrategyManagerTest.java | 131 ++++++++++++++++++-
.../cassandra/io/sstable/LegacySSTableTest.java | 37 ++++++
.../nodetool/stats/TableStatsPrinterTest.java | 6 +
.../nodetool/stats/TableStatsTestBase.java | 1 +
17 files changed, 362 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 25c237f..cad0e28 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Automatic sstable upgrades (CASSANDRA-14197)
* Replace deprecated junit.framework.Assert usages with org.junit.Assert (CASSANDRA-14431)
* cassandra-stress throws NPE if insert section isn't specified in user profile (CASSSANDRA-14426)
* List clients by protocol versions `nodetool clientstats --by-protocol` (CASSANDRA-14335)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index a13f633..4885a12 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,9 @@ using the provided 'sstableupgrade' tool.
New features
------------
+ - There is now an option to automatically upgrade sstables after Cassandra upgrade, enable
+ either in `cassandra.yaml:automatic_sstable_upgrade` or via JMX during runtime. See
+ CASSANDRA-14197.
- `nodetool refresh` has been deprecated in favour of `nodetool import` - see CASSANDRA-6719
for details
- An experimental option to compare all merkle trees together has been added - for example, in
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 7e4b2c2..7cc9e32 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1178,3 +1178,9 @@ back_pressure_strategy:
# The full query log will recrusively delete the contents of this path at
# times. Don't place links in this directory to other parts of the filesystem.
#full_query_log_dir: /tmp/cassandrafullquerylog
+
+# Automatically upgrade sstables after upgrade - if there is no ordinary compaction to do, the
+# oldest non-upgraded sstable will get upgraded to the latest version
+# automatic_sstable_upgrade: false
+# Limit the number of concurrent sstable upgrades
+# max_concurrent_automatic_sstable_upgrades: 1
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index aa4b028..2c28796 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -377,6 +377,8 @@ public class Config
// parameters to adjust how much to delay startup until a certain amount of the cluster is connect to and marked alive
public int block_for_peers_percentage = 70;
public int block_for_peers_timeout_in_secs = 10;
+ public volatile boolean automatic_sstable_upgrade = false;
+ public volatile int max_concurrent_automatic_sstable_upgrades = 1;
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index c738971..6b11974 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -748,6 +748,8 @@ public class DatabaseDescriptor
if (conf.otc_coalescing_enough_coalesced_messages <= 0)
throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
+
+ validateMaxConcurrentAutoUpgradeTasksConf(conf.max_concurrent_automatic_sstable_upgrades);
}
private static String storagedirFor(String type)
@@ -2544,4 +2546,37 @@ public class DatabaseDescriptor
{
return conf.block_for_peers_timeout_in_secs;
}
+
+ public static boolean automaticSSTableUpgrade()
+ {
+ return conf.automatic_sstable_upgrade;
+ }
+
+ public static void setAutomaticSSTableUpgradeEnabled(boolean enabled)
+ {
+ if (conf.automatic_sstable_upgrade != enabled)
+ logger.debug("Changing automatic_sstable_upgrade to {}", enabled);
+ conf.automatic_sstable_upgrade = enabled;
+ }
+
+ public static int maxConcurrentAutoUpgradeTasks()
+ {
+ return conf.max_concurrent_automatic_sstable_upgrades;
+ }
+
+ public static void setMaxConcurrentAutoUpgradeTasks(int value)
+ {
+ if (conf.max_concurrent_automatic_sstable_upgrades != value)
+ logger.debug("Changing max_concurrent_automatic_sstable_upgrades to {}", value);
+ validateMaxConcurrentAutoUpgradeTasksConf(value);
+ conf.max_concurrent_automatic_sstable_upgrades = value;
+ }
+
+ private static void validateMaxConcurrentAutoUpgradeTasksConf(int value)
+ {
+ if (value < 0)
+ throw new ConfigurationException("max_concurrent_automatic_sstable_upgrades can't be negative");
+ if (value > getConcurrentCompactors())
+ logger.warn("max_concurrent_automatic_sstable_upgrades ({}) is larger than concurrent_compactors ({})", value, getConcurrentCompactors());
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 831d8ca..5602aab 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongPredicate;
import java.util.stream.Collectors;
import javax.management.MBeanServer;
@@ -41,6 +42,7 @@ import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.repair.ValidationPartitionIterator;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -94,6 +96,9 @@ public class CompactionManager implements CompactionManagerMBean
private static final Logger logger = LoggerFactory.getLogger(CompactionManager.class);
public static final CompactionManager instance;
+ @VisibleForTesting
+ public final AtomicInteger currentlyBackgroundUpgrading = new AtomicInteger(0);
+
public static final int NO_GC = Integer.MIN_VALUE;
public static final int GC_ALL = Integer.MAX_VALUE;
@@ -256,6 +261,7 @@ public class CompactionManager implements CompactionManagerMBean
// the actual sstables to compact are not determined until we run the BCT; that way, if new sstables
// are created between task submission and execution, we execute against the most up-to-date information
+ @VisibleForTesting
class BackgroundCompactionCandidate implements Runnable
{
private final ColumnFamilyStore cfs;
@@ -268,6 +274,7 @@ public class CompactionManager implements CompactionManagerMBean
public void run()
{
+ boolean ranCompaction = false;
try
{
logger.trace("Checking {}.{}", cfs.keyspace.getName(), cfs.name);
@@ -281,19 +288,53 @@ public class CompactionManager implements CompactionManagerMBean
AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs, FBUtilities.nowInSeconds()));
if (task == null)
{
- logger.trace("No tasks available");
- return;
+ if (DatabaseDescriptor.automaticSSTableUpgrade())
+ ranCompaction = maybeRunUpgradeTask(strategy);
+ }
+ else
+ {
+ task.execute(metrics);
+ ranCompaction = true;
}
- task.execute(metrics);
}
finally
{
compactingCF.remove(cfs);
}
- submitBackground(cfs);
+ if (ranCompaction) // only submit background if we actually ran a compaction - otherwise we end up in an infinite loop submitting noop background tasks
+ submitBackground(cfs);
+ }
+
+ boolean maybeRunUpgradeTask(CompactionStrategyManager strategy)
+ {
+ logger.debug("Checking for upgrade tasks {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+ try
+ {
+ if (currentlyBackgroundUpgrading.incrementAndGet() <= DatabaseDescriptor.maxConcurrentAutoUpgradeTasks())
+ {
+ AbstractCompactionTask upgradeTask = strategy.findUpgradeSSTableTask();
+ if (upgradeTask != null)
+ {
+ upgradeTask.execute(metrics);
+ return true;
+ }
+ }
+ }
+ finally
+ {
+ currentlyBackgroundUpgrading.decrementAndGet();
+ }
+ logger.trace("No tasks available");
+ return false;
}
}
+ @VisibleForTesting
+ public BackgroundCompactionCandidate getBackgroundCompactionCandidate(ColumnFamilyStore cfs)
+ {
+ return new BackgroundCompactionCandidate(cfs);
+ }
+
/**
* Run an operation over all sstables using jobs threads
*
@@ -1834,6 +1875,33 @@ public class CompactionManager implements CompactionManagerMBean
viewBuildExecutor.setMaximumPoolSize(number);
}
+ public boolean getAutomaticSSTableUpgradeEnabled()
+ {
+ return DatabaseDescriptor.automaticSSTableUpgrade();
+ }
+
+ public void setAutomaticSSTableUpgradeEnabled(boolean enabled)
+ {
+ DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(enabled);
+ }
+
+ public int getMaxConcurrentAutoUpgradeTasks()
+ {
+ return DatabaseDescriptor.maxConcurrentAutoUpgradeTasks();
+ }
+
+ public void setMaxConcurrentAutoUpgradeTasks(int value)
+ {
+ try
+ {
+ DatabaseDescriptor.setMaxConcurrentAutoUpgradeTasks(value);
+ }
+ catch (ConfigurationException e)
+ {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
/**
* Try to stop all of the compactions for given ColumnFamilies.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
index b98b371..e4d5392 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
@@ -138,4 +138,25 @@ public interface CompactionManagerMBean
* @param number New maximum of view build threads
*/
public void setMaximumViewBuildThreads(int number);
+
+ /**
+ * Get automatic sstable upgrade enabled
+ */
+ public boolean getAutomaticSSTableUpgradeEnabled();
+ /**
+ * Set if automatic sstable upgrade should be enabled
+ */
+ public void setAutomaticSSTableUpgradeEnabled(boolean enabled);
+
+ /**
+ * Get the number of concurrent sstable upgrade tasks we should run
+ * when automatic sstable upgrades are enabled
+ */
+ public int getMaxConcurrentAutoUpgradeTasks();
+
+ /**
+ * Set the number of concurrent sstable upgrade tasks we should run
+ * when automatic sstable upgrades are enabled
+ */
+ public void setMaxConcurrentAutoUpgradeTasks(int value);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 61f0630..c77ed92 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.compaction;
+import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Callable;
@@ -29,7 +30,9 @@ import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.DiskBoundaries;
import org.apache.cassandra.index.Index;
@@ -38,12 +41,12 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -202,6 +205,36 @@ public class CompactionStrategyManager implements INotificationConsumer
}
}
+ /**
+ * finds the oldest (by modification date) non-latest-version sstable on disk and creates an upgrade task for it
+ * @return
+ */
+ @VisibleForTesting
+ AbstractCompactionTask findUpgradeSSTableTask()
+ {
+ if (!isEnabled() || !DatabaseDescriptor.automaticSSTableUpgrade())
+ return null;
+ Set<SSTableReader> compacting = cfs.getTracker().getCompacting();
+ List<SSTableReader> potentialUpgrade = cfs.getLiveSSTables()
+ .stream()
+ .filter(s -> !compacting.contains(s) && !s.descriptor.version.isLatestVersion())
+ .sorted((o1, o2) -> {
+ File f1 = new File(o1.descriptor.filenameFor(Component.DATA));
+ File f2 = new File(o2.descriptor.filenameFor(Component.DATA));
+ return Longs.compare(f1.lastModified(), f2.lastModified());
+ }).collect(Collectors.toList());
+ for (SSTableReader sstable : potentialUpgrade)
+ {
+ LifecycleTransaction txn = cfs.getTracker().tryModify(sstable, OperationType.UPGRADE_SSTABLES);
+ if (txn != null)
+ {
+ logger.debug("Running automatic sstable upgrade for {}", sstable);
+ return getCompactionStrategyFor(sstable).getCompactionTask(txn, Integer.MIN_VALUE, Long.MAX_VALUE);
+ }
+ }
+ return null;
+ }
+
public boolean isEnabled()
{
return enabled && isActive;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index d8cb18e..c0e7f7b 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -95,6 +95,8 @@ public class TableMetrics
public final Gauge<Integer> pendingCompactions;
/** Number of SSTables on disk for this CF */
public final Gauge<Integer> liveSSTableCount;
+ /** Number of SSTables with old version on disk for this CF */
+ public final Gauge<Integer> oldVersionSSTableCount;
/** Disk space used by SSTables belonging to this table */
public final Counter liveDiskSpaceUsed;
/** Total disk space used by SSTables belonging to this table, including obsolete ones waiting to be GC'd */
@@ -541,6 +543,17 @@ public class TableMetrics
return cfs.getTracker().getView().liveSSTables().size();
}
});
+ oldVersionSSTableCount = createTableGauge("OldVersionSSTableCount", new Gauge<Integer>()
+ {
+ public Integer getValue()
+ {
+ int count = 0;
+ for (SSTableReader sstable : cfs.getLiveSSTables())
+ if (!sstable.descriptor.version.isLatestVersion())
+ count++;
+ return count;
+ }
+ });
liveDiskSpaceUsed = createTableCounter("LiveDiskSpaceUsed");
totalDiskSpaceUsed = createTableCounter("TotalDiskSpaceUsed");
minPartitionSize = createTableGauge("MinPartitionSize", "MinRowSize", new Gauge<Long>()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 4fdb563..e221e11 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1399,6 +1399,7 @@ public class NodeProbe implements AutoCloseable
case "EstimatedPartitionCount":
case "KeyCacheHitRate":
case "LiveSSTableCount":
+ case "OldVersionSSTableCount":
case "MaxPartitionSize":
case "MeanPartitionSize":
case "MemtableColumnsCount":
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java b/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java
index 908c856..01d2164 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java
@@ -29,6 +29,7 @@ public class StatsTable
public boolean isIndex;
public boolean isLeveledSstable = false;
public Object sstableCount;
+ public Object oldSSTableCount;
public String spaceUsedLive;
public String spaceUsedTotal;
public String spaceUsedBySnapshotsTotal;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java
index 79531c1..624484f 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java
@@ -209,6 +209,7 @@ public class TableStatsHolder implements StatsHolder
statsTable.tableName = tableName;
statsTable.isIndex = tableName.contains(".");
statsTable.sstableCount = probe.getColumnFamilyMetric(keyspaceName, tableName, "LiveSSTableCount");
+ statsTable.oldSSTableCount = probe.getColumnFamilyMetric(keyspaceName, tableName, "OldVersionSSTableCount");
int[] leveledSStables = table.getSSTableCountPerLevel();
if (leveledSStables != null)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java
index 4ea7562..b166803 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java
@@ -75,6 +75,7 @@ public class TableStatsPrinter
{
out.println(indent + "Table" + (table.isIndex ? " (index): " + table.tableName : ": ") + tableDisplayName);
out.println(indent + "SSTable count: " + table.sstableCount);
+ out.println(indent + "Old SSTable count: " + table.oldSSTableCount);
if (table.isLeveledSstable)
out.println(indent + "SSTables in each level: [" + String.join(", ",
table.sstablesInEachLevel) + "]");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
index c315fb9..6f2551d 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
@@ -24,6 +24,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
@@ -76,6 +78,10 @@ public class CompactionStrategyManagerTest
* disk assignment based on its generation - See {@link this#getSSTableIndex(Integer[], SSTableReader)}
*/
originalPartitioner = StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
+ SchemaLoader.createKeyspace(KS_PREFIX,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KS_PREFIX, TABLE_PREFIX)
+ .compaction(CompactionParams.scts(Collections.emptyMap())));
}
@AfterClass
@@ -90,10 +96,6 @@ public class CompactionStrategyManagerTest
{
// Creates 100 SSTables with keys 0-99
int numSSTables = 100;
- SchemaLoader.createKeyspace(KS_PREFIX,
- KeyspaceParams.simple(1),
- SchemaLoader.standardCFMD(KS_PREFIX, TABLE_PREFIX)
- .compaction(CompactionParams.scts(Collections.emptyMap())));
ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
cfs.disableAutoCompaction();
Set<SSTableReader> previousSSTables = cfs.getLiveSSTables();
@@ -177,6 +179,81 @@ public class CompactionStrategyManagerTest
}
}
+
+
+ @Test
+ public void testAutomaticUpgradeConcurrency() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
+ DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(true);
+ DatabaseDescriptor.setMaxConcurrentAutoUpgradeTasks(1);
+
+ // latch to block CompactionManager.BackgroundCompactionCandidate#maybeRunUpgradeTask
+ // inside the currentlyBackgroundUpgrading check - with max_concurrent_auto_upgrade_tasks = 1 this will make
+ // sure that BackgroundCompactionCandidate#maybeRunUpgradeTask returns false until the latch has been counted down
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicInteger upgradeTaskCount = new AtomicInteger(0);
+ MockCFSForCSM mock = new MockCFSForCSM(cfs, latch, upgradeTaskCount);
+
+ CompactionManager.BackgroundCompactionCandidate r = CompactionManager.instance.getBackgroundCompactionCandidate(mock);
+ CompactionStrategyManager mgr = mock.getCompactionStrategyManager();
+ // basic idea is that we start a thread which will be able to get in to the currentlyBackgroundUpgrading-guarded
+ // code in CompactionManager, then we try to run a bunch more of the upgrade tasks which should return false
+ // due to the currentlyBackgroundUpgrading count being >= max_concurrent_auto_upgrade_tasks
+ Thread t = new Thread(() -> r.maybeRunUpgradeTask(mgr));
+ t.start();
+ Thread.sleep(100); // let the thread start and grab the task
+ assertEquals(1, CompactionManager.instance.currentlyBackgroundUpgrading.get());
+ assertFalse(r.maybeRunUpgradeTask(mgr));
+ assertFalse(r.maybeRunUpgradeTask(mgr));
+ latch.countDown();
+ t.join();
+ assertEquals(1, upgradeTaskCount.get()); // we should only call findUpgradeSSTableTask once when concurrency = 1
+ assertEquals(0, CompactionManager.instance.currentlyBackgroundUpgrading.get());
+
+ DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(false);
+ }
+
+ @Test
+ public void testAutomaticUpgradeConcurrency2() throws Exception
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
+ DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(true);
+ DatabaseDescriptor.setMaxConcurrentAutoUpgradeTasks(2);
+ // latch to block CompactionManager.BackgroundCompactionCandidate#maybeRunUpgradeTask
+ // inside the currentlyBackgroundUpgrading check - with max_concurrent_auto_upgrade_tasks = 1 this will make
+ // sure that BackgroundCompactionCandidate#maybeRunUpgradeTask returns false until the latch has been counted down
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicInteger upgradeTaskCount = new AtomicInteger();
+ MockCFSForCSM mock = new MockCFSForCSM(cfs, latch, upgradeTaskCount);
+
+ CompactionManager.BackgroundCompactionCandidate r = CompactionManager.instance.getBackgroundCompactionCandidate(mock);
+ CompactionStrategyManager mgr = mock.getCompactionStrategyManager();
+
+ // basic idea is that we start 2 threads who will be able to get in to the currentlyBackgroundUpgrading-guarded
+ // code in CompactionManager, then we try to run a bunch more of the upgrade task which should return false
+ // due to the currentlyBackgroundUpgrading count being >= max_concurrent_auto_upgrade_tasks
+ Thread t = new Thread(() -> r.maybeRunUpgradeTask(mgr));
+ t.start();
+ Thread t2 = new Thread(() -> r.maybeRunUpgradeTask(mgr));
+ t2.start();
+ Thread.sleep(100); // let the threads start and grab the task
+ assertEquals(2, CompactionManager.instance.currentlyBackgroundUpgrading.get());
+ assertFalse(r.maybeRunUpgradeTask(mgr));
+ assertFalse(r.maybeRunUpgradeTask(mgr));
+ assertFalse(r.maybeRunUpgradeTask(mgr));
+ assertEquals(2, CompactionManager.instance.currentlyBackgroundUpgrading.get());
+ latch.countDown();
+ t.join();
+ t2.join();
+ assertEquals(2, upgradeTaskCount.get());
+ assertEquals(0, CompactionManager.instance.currentlyBackgroundUpgrading.get());
+
+ DatabaseDescriptor.setMaxConcurrentAutoUpgradeTasks(1);
+ DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(false);
+ }
+
+
private MockCFS createJBODMockCFS(int disks)
{
// Create #disks data directories to simulate JBOD
@@ -306,4 +383,50 @@ public class CompactionStrategyManagerTest
super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true);
}
}
+
+ private static class MockCFSForCSM extends ColumnFamilyStore
+ {
+ private final CountDownLatch latch;
+ private final AtomicInteger upgradeTaskCount;
+
+ private MockCFSForCSM(ColumnFamilyStore cfs, CountDownLatch latch, AtomicInteger upgradeTaskCount)
+ {
+ super(cfs.keyspace, cfs.name, 10, cfs.metadata, cfs.getDirectories(), true, false, false);
+ this.latch = latch;
+ this.upgradeTaskCount = upgradeTaskCount;
+ }
+ @Override
+ public CompactionStrategyManager getCompactionStrategyManager()
+ {
+ return new MockCSM(this, latch, upgradeTaskCount);
+ }
+ }
+
+ private static class MockCSM extends CompactionStrategyManager
+ {
+ private final CountDownLatch latch;
+ private final AtomicInteger upgradeTaskCount;
+
+ private MockCSM(ColumnFamilyStore cfs, CountDownLatch latch, AtomicInteger upgradeTaskCount)
+ {
+ super(cfs);
+ this.latch = latch;
+ this.upgradeTaskCount = upgradeTaskCount;
+ }
+
+ @Override
+ public AbstractCompactionTask findUpgradeSSTableTask()
+ {
+ try
+ {
+ latch.await();
+ upgradeTaskCount.incrementAndGet();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 8dd8197..13d3eac 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -42,6 +42,8 @@ import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.AbstractCompactionTask;
+import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.Verifier;
import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
import org.apache.cassandra.dht.IPartitioner;
@@ -61,6 +63,8 @@ import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
@@ -234,6 +238,39 @@ public class LegacySSTableTest
}
}
+ @Test
+ public void testAutomaticUpgrade() throws Exception
+ {
+ for (String legacyVersion : legacyVersions)
+ {
+ logger.info("Loading legacy version: {}", legacyVersion);
+ truncateLegacyTables(legacyVersion);
+ loadLegacyTables(legacyVersion);
+ ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple", legacyVersion));
+ AbstractCompactionTask act = cfs.getCompactionStrategyManager().getNextBackgroundTask(0);
+ // there should be no compactions to run with auto upgrades disabled:
+ assertEquals(null, act);
+ }
+
+ DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(true);
+ for (String legacyVersion : legacyVersions)
+ {
+ logger.info("Loading legacy version: {}", legacyVersion);
+ truncateLegacyTables(legacyVersion);
+ loadLegacyTables(legacyVersion);
+ ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple", legacyVersion));
+ if (cfs.getLiveSSTables().stream().anyMatch(s -> !s.descriptor.version.isLatestVersion()))
+ assertTrue(cfs.metric.oldVersionSSTableCount.getValue() > 0);
+ while (cfs.getLiveSSTables().stream().anyMatch(s -> !s.descriptor.version.isLatestVersion()))
+ {
+ CompactionManager.instance.submitBackground(cfs);
+ Thread.sleep(100);
+ }
+ assertTrue(cfs.metric.oldVersionSSTableCount.getValue() == 0);
+ }
+ DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(false);
+ }
+
private void streamLegacyTables(String legacyVersion) throws Exception
{
logger.info("Streaming legacy version {}", legacyVersion);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinterTest.java b/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinterTest.java
index 26b2ff6..32b0b62 100644
--- a/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinterTest.java
+++ b/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinterTest.java
@@ -35,6 +35,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase
public static final String expectedDefaultTable1Output =
"\tTable: %s\n" +
"\tSSTable count: 60000\n" +
+ "\tOld SSTable count: 0\n" +
"\tSpace used (live): 0\n" +
"\tSpace used (total): 9001\n" +
"\tSpace used by snapshots (total): 1111\n" +
@@ -65,6 +66,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase
public static final String expectedDefaultTable2Output =
"\tTable: %s\n" +
"\tSSTable count: 3000\n" +
+ "\tOld SSTable count: 0\n" +
"\tSpace used (live): 22\n" +
"\tSpace used (total): 1024\n" +
"\tSpace used by snapshots (total): 222\n" +
@@ -100,6 +102,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase
public static final String expectedDefaultTable3Output =
"\tTable: %s\n" +
"\tSSTable count: 50000\n" +
+ "\tOld SSTable count: 0\n" +
"\tSpace used (live): 0\n" +
"\tSpace used (total): 512\n" +
"\tSpace used by snapshots (total): 0\n" +
@@ -130,6 +133,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase
public static final String expectedDefaultTable4Output =
"\tTable: %s\n" +
"\tSSTable count: 2000\n" +
+ "\tOld SSTable count: 0\n" +
"\tSpace used (live): 4444\n" +
"\tSpace used (total): 256\n" +
"\tSpace used by snapshots (total): 44\n" +
@@ -165,6 +169,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase
public static final String expectedDefaultTable5Output =
"\tTable: %s\n" +
"\tSSTable count: 40000\n" +
+ "\tOld SSTable count: 0\n" +
"\tSpace used (live): 55555\n" +
"\tSpace used (total): 64\n" +
"\tSpace used by snapshots (total): 55555\n" +
@@ -195,6 +200,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase
public static final String expectedDefaultTable6Output =
"\tTable: %s\n" +
"\tSSTable count: 1000\n" +
+ "\tOld SSTable count: 0\n" +
"\tSpace used (live): 666666\n" +
"\tSpace used (total): 0\n" +
"\tSpace used by snapshots (total): 0\n" +
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsTestBase.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsTestBase.java b/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsTestBase.java
index bb56ef8..b2f1663 100644
--- a/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsTestBase.java
+++ b/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsTestBase.java
@@ -70,6 +70,7 @@ public class TableStatsTestBase
template.tableName = new String(tableName);
template.isIndex = false;
template.sstableCount = 0L;
+ template.oldSSTableCount = 0L;
template.spaceUsedLive = "0";
template.spaceUsedTotal = "0";
template.spaceUsedBySnapshotsTotal = "0";
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org