You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/05/09 06:22:01 UTC

cassandra git commit: Automatic sstable upgrades

Repository: cassandra
Updated Branches:
  refs/heads/trunk 6b0247576 -> d14a9266c


Automatic sstable upgrades

Patch by marcuse; reviewed by Ariel Weisberg for CASSANDRA-14197


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d14a9266
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d14a9266
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d14a9266

Branch: refs/heads/trunk
Commit: d14a9266c7ddff0589fdbe7a1836217b8bb8b394
Parents: 6b02475
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Mar 15 09:25:23 2018 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed May 9 08:17:33 2018 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   3 +
 conf/cassandra.yaml                             |   6 +
 .../org/apache/cassandra/config/Config.java     |   2 +
 .../cassandra/config/DatabaseDescriptor.java    |  35 +++++
 .../db/compaction/CompactionManager.java        |  76 ++++++++++-
 .../db/compaction/CompactionManagerMBean.java   |  21 +++
 .../compaction/CompactionStrategyManager.java   |  35 ++++-
 .../apache/cassandra/metrics/TableMetrics.java  |  13 ++
 .../org/apache/cassandra/tools/NodeProbe.java   |   1 +
 .../tools/nodetool/stats/StatsTable.java        |   1 +
 .../tools/nodetool/stats/TableStatsHolder.java  |   1 +
 .../tools/nodetool/stats/TableStatsPrinter.java |   1 +
 .../CompactionStrategyManagerTest.java          | 131 ++++++++++++++++++-
 .../cassandra/io/sstable/LegacySSTableTest.java |  37 ++++++
 .../nodetool/stats/TableStatsPrinterTest.java   |   6 +
 .../nodetool/stats/TableStatsTestBase.java      |   1 +
 17 files changed, 362 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 25c237f..cad0e28 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Automatic sstable upgrades (CASSANDRA-14197)
  * Replace deprecated junit.framework.Assert usages with org.junit.Assert (CASSANDRA-14431)
  * cassandra-stress throws NPE if insert section isn't specified in user profile (CASSSANDRA-14426)
  * List clients by protocol versions `nodetool clientstats --by-protocol` (CASSANDRA-14335)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index a13f633..4885a12 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -38,6 +38,9 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - There is now an option to automatically upgrade sstables after Cassandra upgrade, enable
+     either in `cassandra.yaml:automatic_sstable_upgrade` or via JMX during runtime. See
+     CASSANDRA-14197.
    - `nodetool refresh` has been deprecated in favour of `nodetool import` - see CASSANDRA-6719
      for details
    - An experimental option to compare all merkle trees together has been added - for example, in

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 7e4b2c2..7cc9e32 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1178,3 +1178,9 @@ back_pressure_strategy:
 # The full query log will recrusively delete the contents of this path at
 # times. Don't place links in this directory to other parts of the filesystem.
 #full_query_log_dir: /tmp/cassandrafullquerylog
+
+# Automatically upgrade sstables after upgrade - if there is no ordinary compaction to do, the
+# oldest non-upgraded sstable will get upgraded to the latest version
+# automatic_sstable_upgrade: false
+# Limit the number of concurrent sstable upgrades
+# max_concurrent_automatic_sstable_upgrades: 1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index aa4b028..2c28796 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -377,6 +377,8 @@ public class Config
     // parameters to adjust how much to delay startup until a certain amount of the cluster is connect to and marked alive
     public int block_for_peers_percentage = 70;
     public int block_for_peers_timeout_in_secs = 10;
+    public volatile boolean automatic_sstable_upgrade = false;
+    public volatile int max_concurrent_automatic_sstable_upgrades = 1;
 
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index c738971..6b11974 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -748,6 +748,8 @@ public class DatabaseDescriptor
 
         if (conf.otc_coalescing_enough_coalesced_messages <= 0)
             throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false);
+
+        validateMaxConcurrentAutoUpgradeTasksConf(conf.max_concurrent_automatic_sstable_upgrades);
     }
 
     private static String storagedirFor(String type)
@@ -2544,4 +2546,37 @@ public class DatabaseDescriptor
     {
         return conf.block_for_peers_timeout_in_secs;
     }
+
+    public static boolean automaticSSTableUpgrade()
+    {
+        return conf.automatic_sstable_upgrade;
+    }
+
+    public static void setAutomaticSSTableUpgradeEnabled(boolean enabled)
+    {
+        if (conf.automatic_sstable_upgrade != enabled)
+            logger.debug("Changing automatic_sstable_upgrade to {}", enabled);
+        conf.automatic_sstable_upgrade = enabled;
+    }
+
+    public static int maxConcurrentAutoUpgradeTasks()
+    {
+        return conf.max_concurrent_automatic_sstable_upgrades;
+    }
+
+    public static void setMaxConcurrentAutoUpgradeTasks(int value)
+    {
+        if (conf.max_concurrent_automatic_sstable_upgrades != value)
+            logger.debug("Changing max_concurrent_automatic_sstable_upgrades to {}", value);
+        validateMaxConcurrentAutoUpgradeTasksConf(value);
+        conf.max_concurrent_automatic_sstable_upgrades = value;
+    }
+
+    private static void validateMaxConcurrentAutoUpgradeTasksConf(int value)
+    {
+        if (value < 0)
+            throw new ConfigurationException("max_concurrent_automatic_sstable_upgrades can't be negative");
+        if (value > getConcurrentCompactors())
+            logger.warn("max_concurrent_automatic_sstable_upgrades ({}) is larger than concurrent_compactors ({})", value, getConcurrentCompactors());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 831d8ca..5602aab 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.LongPredicate;
 import java.util.stream.Collectors;
 import javax.management.MBeanServer;
@@ -41,6 +42,7 @@ import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.repair.ValidationPartitionIterator;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -94,6 +96,9 @@ public class CompactionManager implements CompactionManagerMBean
     private static final Logger logger = LoggerFactory.getLogger(CompactionManager.class);
     public static final CompactionManager instance;
 
+    @VisibleForTesting
+    public final AtomicInteger currentlyBackgroundUpgrading = new AtomicInteger(0);
+
     public static final int NO_GC = Integer.MIN_VALUE;
     public static final int GC_ALL = Integer.MAX_VALUE;
 
@@ -256,6 +261,7 @@ public class CompactionManager implements CompactionManagerMBean
 
     // the actual sstables to compact are not determined until we run the BCT; that way, if new sstables
     // are created between task submission and execution, we execute against the most up-to-date information
+    @VisibleForTesting
     class BackgroundCompactionCandidate implements Runnable
     {
         private final ColumnFamilyStore cfs;
@@ -268,6 +274,7 @@ public class CompactionManager implements CompactionManagerMBean
 
         public void run()
         {
+            boolean ranCompaction = false;
             try
             {
                 logger.trace("Checking {}.{}", cfs.keyspace.getName(), cfs.name);
@@ -281,19 +288,53 @@ public class CompactionManager implements CompactionManagerMBean
                 AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs, FBUtilities.nowInSeconds()));
                 if (task == null)
                 {
-                    logger.trace("No tasks available");
-                    return;
+                    if (DatabaseDescriptor.automaticSSTableUpgrade())
+                        ranCompaction = maybeRunUpgradeTask(strategy);
+                }
+                else
+                {
+                    task.execute(metrics);
+                    ranCompaction = true;
                 }
-                task.execute(metrics);
             }
             finally
             {
                 compactingCF.remove(cfs);
             }
-            submitBackground(cfs);
+            if (ranCompaction) // only submit background if we actually ran a compaction - otherwise we end up in an infinite loop submitting noop background tasks
+                submitBackground(cfs);
+        }
+
+        boolean maybeRunUpgradeTask(CompactionStrategyManager strategy)
+        {
+            logger.debug("Checking for upgrade tasks {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+            try
+            {
+                if (currentlyBackgroundUpgrading.incrementAndGet() <= DatabaseDescriptor.maxConcurrentAutoUpgradeTasks())
+                {
+                    AbstractCompactionTask upgradeTask = strategy.findUpgradeSSTableTask();
+                    if (upgradeTask != null)
+                    {
+                        upgradeTask.execute(metrics);
+                        return true;
+                    }
+                }
+            }
+            finally
+            {
+                currentlyBackgroundUpgrading.decrementAndGet();
+            }
+            logger.trace("No tasks available");
+            return false;
         }
     }
 
+    @VisibleForTesting
+    public BackgroundCompactionCandidate getBackgroundCompactionCandidate(ColumnFamilyStore cfs)
+    {
+        return new BackgroundCompactionCandidate(cfs);
+    }
+
     /**
      * Run an operation over all sstables using jobs threads
      *
@@ -1834,6 +1875,33 @@ public class CompactionManager implements CompactionManagerMBean
         viewBuildExecutor.setMaximumPoolSize(number);
     }
 
+    public boolean getAutomaticSSTableUpgradeEnabled()
+    {
+        return DatabaseDescriptor.automaticSSTableUpgrade();
+    }
+
+    public void setAutomaticSSTableUpgradeEnabled(boolean enabled)
+    {
+        DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(enabled);
+    }
+
+    public int getMaxConcurrentAutoUpgradeTasks()
+    {
+        return DatabaseDescriptor.maxConcurrentAutoUpgradeTasks();
+    }
+
+    public void setMaxConcurrentAutoUpgradeTasks(int value)
+    {
+        try
+        {
+            DatabaseDescriptor.setMaxConcurrentAutoUpgradeTasks(value);
+        }
+        catch (ConfigurationException e)
+        {
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
     /**
      * Try to stop all of the compactions for given ColumnFamilies.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
index b98b371..e4d5392 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
@@ -138,4 +138,25 @@ public interface CompactionManagerMBean
      * @param number New maximum of view build threads
      */
     public void setMaximumViewBuildThreads(int number);
+
+    /**
+     * Get automatic sstable upgrade enabled
+     */
+    public boolean getAutomaticSSTableUpgradeEnabled();
+    /**
+     * Set if automatic sstable upgrade should be enabled
+     */
+    public void setAutomaticSSTableUpgradeEnabled(boolean enabled);
+
+    /**
+     * Get the number of concurrent sstable upgrade tasks we should run
+     * when automatic sstable upgrades are enabled
+     */
+    public int getMaxConcurrentAutoUpgradeTasks();
+
+    /**
+     * Set the number of concurrent sstable upgrade tasks we should run
+     * when automatic sstable upgrades are enabled
+     */
+    public void setMaxConcurrentAutoUpgradeTasks(int value);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 61f0630..c77ed92 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.compaction;
 
 
+import java.io.File;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.Callable;
@@ -29,7 +30,9 @@ import java.util.stream.Stream;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DiskBoundaries;
 import org.apache.cassandra.index.Index;
 
@@ -38,12 +41,12 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -202,6 +205,36 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
     }
 
+    /**
+     * finds the oldest (by modification date) non-latest-version sstable on disk and creates an upgrade task for it
+     * @return
+     */
+    @VisibleForTesting
+    AbstractCompactionTask findUpgradeSSTableTask()
+    {
+        if (!isEnabled() || !DatabaseDescriptor.automaticSSTableUpgrade())
+            return null;
+        Set<SSTableReader> compacting = cfs.getTracker().getCompacting();
+        List<SSTableReader> potentialUpgrade = cfs.getLiveSSTables()
+                                                  .stream()
+                                                  .filter(s -> !compacting.contains(s) && !s.descriptor.version.isLatestVersion())
+                                                  .sorted((o1, o2) -> {
+                                                      File f1 = new File(o1.descriptor.filenameFor(Component.DATA));
+                                                      File f2 = new File(o2.descriptor.filenameFor(Component.DATA));
+                                                      return Longs.compare(f1.lastModified(), f2.lastModified());
+                                                  }).collect(Collectors.toList());
+        for (SSTableReader sstable : potentialUpgrade)
+        {
+            LifecycleTransaction txn = cfs.getTracker().tryModify(sstable, OperationType.UPGRADE_SSTABLES);
+            if (txn != null)
+            {
+                logger.debug("Running automatic sstable upgrade for {}", sstable);
+                return getCompactionStrategyFor(sstable).getCompactionTask(txn, Integer.MIN_VALUE, Long.MAX_VALUE);
+            }
+        }
+        return null;
+    }
+
     public boolean isEnabled()
     {
         return enabled && isActive;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index d8cb18e..c0e7f7b 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -95,6 +95,8 @@ public class TableMetrics
     public final Gauge<Integer> pendingCompactions;
     /** Number of SSTables on disk for this CF */
     public final Gauge<Integer> liveSSTableCount;
+    /** Number of SSTables with old version on disk for this CF */
+    public final Gauge<Integer> oldVersionSSTableCount;
     /** Disk space used by SSTables belonging to this table */
     public final Counter liveDiskSpaceUsed;
     /** Total disk space used by SSTables belonging to this table, including obsolete ones waiting to be GC'd */
@@ -541,6 +543,17 @@ public class TableMetrics
                 return cfs.getTracker().getView().liveSSTables().size();
             }
         });
+        oldVersionSSTableCount = createTableGauge("OldVersionSSTableCount", new Gauge<Integer>()
+        {
+            public Integer getValue()
+            {
+                int count = 0;
+                for (SSTableReader sstable : cfs.getLiveSSTables())
+                    if (!sstable.descriptor.version.isLatestVersion())
+                        count++;
+                return count;
+            }
+        });
         liveDiskSpaceUsed = createTableCounter("LiveDiskSpaceUsed");
         totalDiskSpaceUsed = createTableCounter("TotalDiskSpaceUsed");
         minPartitionSize = createTableGauge("MinPartitionSize", "MinRowSize", new Gauge<Long>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 4fdb563..e221e11 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1399,6 +1399,7 @@ public class NodeProbe implements AutoCloseable
                 case "EstimatedPartitionCount":
                 case "KeyCacheHitRate":
                 case "LiveSSTableCount":
+                case "OldVersionSSTableCount":
                 case "MaxPartitionSize":
                 case "MeanPartitionSize":
                 case "MemtableColumnsCount":

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java b/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java
index 908c856..01d2164 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/StatsTable.java
@@ -29,6 +29,7 @@ public class StatsTable
     public boolean isIndex;
     public boolean isLeveledSstable = false;
     public Object sstableCount;
+    public Object oldSSTableCount;
     public String spaceUsedLive;
     public String spaceUsedTotal;
     public String spaceUsedBySnapshotsTotal;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java
index 79531c1..624484f 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java
@@ -209,6 +209,7 @@ public class TableStatsHolder implements StatsHolder
                 statsTable.tableName = tableName;
                 statsTable.isIndex = tableName.contains(".");
                 statsTable.sstableCount = probe.getColumnFamilyMetric(keyspaceName, tableName, "LiveSSTableCount");
+                statsTable.oldSSTableCount = probe.getColumnFamilyMetric(keyspaceName, tableName, "OldVersionSSTableCount");
                 int[] leveledSStables = table.getSSTableCountPerLevel();
                 if (leveledSStables != null)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java
index 4ea7562..b166803 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinter.java
@@ -75,6 +75,7 @@ public class TableStatsPrinter
         {
             out.println(indent + "Table" + (table.isIndex ? " (index): " + table.tableName : ": ") + tableDisplayName);
             out.println(indent + "SSTable count: " + table.sstableCount);
+            out.println(indent + "Old SSTable count: " + table.oldSSTableCount);
             if (table.isLeveledSstable)
                 out.println(indent + "SSTables in each level: [" + String.join(", ",
                                                                           table.sstablesInEachLevel) + "]");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
index c315fb9..6f2551d 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
@@ -24,6 +24,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Sets;
@@ -76,6 +78,10 @@ public class CompactionStrategyManagerTest
          * disk assignment based on its generation - See {@link this#getSSTableIndex(Integer[], SSTableReader)}
          */
         originalPartitioner = StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
+        SchemaLoader.createKeyspace(KS_PREFIX,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KS_PREFIX, TABLE_PREFIX)
+                                                .compaction(CompactionParams.scts(Collections.emptyMap())));
     }
 
     @AfterClass
@@ -90,10 +96,6 @@ public class CompactionStrategyManagerTest
     {
         // Creates 100 SSTables with keys 0-99
         int numSSTables = 100;
-        SchemaLoader.createKeyspace(KS_PREFIX,
-                                    KeyspaceParams.simple(1),
-                                    SchemaLoader.standardCFMD(KS_PREFIX, TABLE_PREFIX)
-                                                .compaction(CompactionParams.scts(Collections.emptyMap())));
         ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
         cfs.disableAutoCompaction();
         Set<SSTableReader> previousSSTables = cfs.getLiveSSTables();
@@ -177,6 +179,81 @@ public class CompactionStrategyManagerTest
         }
     }
 
+
+
+    @Test
+    public void testAutomaticUpgradeConcurrency() throws Exception
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
+        DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(true);
+        DatabaseDescriptor.setMaxConcurrentAutoUpgradeTasks(1);
+
+        // latch to block CompactionManager.BackgroundCompactionCandidate#maybeRunUpgradeTask
+        // inside the currentlyBackgroundUpgrading check - with max_concurrent_auto_upgrade_tasks = 1 this will make
+        // sure that BackgroundCompactionCandidate#maybeRunUpgradeTask returns false until the latch has been counted down
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicInteger upgradeTaskCount = new AtomicInteger(0);
+        MockCFSForCSM mock = new MockCFSForCSM(cfs, latch, upgradeTaskCount);
+
+        CompactionManager.BackgroundCompactionCandidate r = CompactionManager.instance.getBackgroundCompactionCandidate(mock);
+        CompactionStrategyManager mgr = mock.getCompactionStrategyManager();
+        // basic idea is that we start a thread which will be able to get in to the currentlyBackgroundUpgrading-guarded
+        // code in CompactionManager, then we try to run a bunch more of the upgrade tasks which should return false
+        // due to the currentlyBackgroundUpgrading count being >= max_concurrent_auto_upgrade_tasks
+        Thread t = new Thread(() -> r.maybeRunUpgradeTask(mgr));
+        t.start();
+        Thread.sleep(100); // let the thread start and grab the task
+        assertEquals(1, CompactionManager.instance.currentlyBackgroundUpgrading.get());
+        assertFalse(r.maybeRunUpgradeTask(mgr));
+        assertFalse(r.maybeRunUpgradeTask(mgr));
+        latch.countDown();
+        t.join();
+        assertEquals(1, upgradeTaskCount.get()); // we should only call findUpgradeSSTableTask once when concurrency = 1
+        assertEquals(0, CompactionManager.instance.currentlyBackgroundUpgrading.get());
+
+        DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(false);
+    }
+
+    @Test
+    public void testAutomaticUpgradeConcurrency2() throws Exception
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
+        DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(true);
+        DatabaseDescriptor.setMaxConcurrentAutoUpgradeTasks(2);
+        // latch to block CompactionManager.BackgroundCompactionCandidate#maybeRunUpgradeTask
+        // inside the currentlyBackgroundUpgrading check - with max_concurrent_auto_upgrade_tasks = 1 this will make
+        // sure that BackgroundCompactionCandidate#maybeRunUpgradeTask returns false until the latch has been counted down
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicInteger upgradeTaskCount = new AtomicInteger();
+        MockCFSForCSM mock = new MockCFSForCSM(cfs, latch, upgradeTaskCount);
+
+        CompactionManager.BackgroundCompactionCandidate r = CompactionManager.instance.getBackgroundCompactionCandidate(mock);
+        CompactionStrategyManager mgr = mock.getCompactionStrategyManager();
+
+        // basic idea is that we start 2 threads who will be able to get in to the currentlyBackgroundUpgrading-guarded
+        // code in CompactionManager, then we try to run a bunch more of the upgrade task which should return false
+        // due to the currentlyBackgroundUpgrading count being >= max_concurrent_auto_upgrade_tasks
+        Thread t = new Thread(() -> r.maybeRunUpgradeTask(mgr));
+        t.start();
+        Thread t2 = new Thread(() -> r.maybeRunUpgradeTask(mgr));
+        t2.start();
+        Thread.sleep(100); // let the threads start and grab the task
+        assertEquals(2, CompactionManager.instance.currentlyBackgroundUpgrading.get());
+        assertFalse(r.maybeRunUpgradeTask(mgr));
+        assertFalse(r.maybeRunUpgradeTask(mgr));
+        assertFalse(r.maybeRunUpgradeTask(mgr));
+        assertEquals(2, CompactionManager.instance.currentlyBackgroundUpgrading.get());
+        latch.countDown();
+        t.join();
+        t2.join();
+        assertEquals(2, upgradeTaskCount.get());
+        assertEquals(0, CompactionManager.instance.currentlyBackgroundUpgrading.get());
+
+        DatabaseDescriptor.setMaxConcurrentAutoUpgradeTasks(1);
+        DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(false);
+    }
+
+
     private MockCFS createJBODMockCFS(int disks)
     {
         // Create #disks data directories to simulate JBOD
@@ -306,4 +383,50 @@ public class CompactionStrategyManagerTest
             super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true);
         }
     }
+
+    private static class MockCFSForCSM extends ColumnFamilyStore
+    {
+        private final CountDownLatch latch;
+        private final AtomicInteger upgradeTaskCount;
+
+        private MockCFSForCSM(ColumnFamilyStore cfs, CountDownLatch latch, AtomicInteger upgradeTaskCount)
+        {
+            super(cfs.keyspace, cfs.name, 10, cfs.metadata, cfs.getDirectories(), true, false, false);
+            this.latch = latch;
+            this.upgradeTaskCount = upgradeTaskCount;
+        }
+        @Override
+        public CompactionStrategyManager getCompactionStrategyManager()
+        {
+            return new MockCSM(this, latch, upgradeTaskCount);
+        }
+    }
+
+    private static class MockCSM extends CompactionStrategyManager
+    {
+        private final CountDownLatch latch;
+        private final AtomicInteger upgradeTaskCount;
+
+        private MockCSM(ColumnFamilyStore cfs, CountDownLatch latch, AtomicInteger upgradeTaskCount)
+        {
+            super(cfs);
+            this.latch = latch;
+            this.upgradeTaskCount = upgradeTaskCount;
+        }
+
+        @Override
+        public AbstractCompactionTask findUpgradeSSTableTask()
+        {
+            try
+            {
+                latch.await();
+                upgradeTaskCount.incrementAndGet();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 8dd8197..13d3eac 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -42,6 +42,8 @@ import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.AbstractCompactionTask;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.Verifier;
 import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
 import org.apache.cassandra.dht.IPartitioner;
@@ -61,6 +63,8 @@ import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -234,6 +238,39 @@ public class LegacySSTableTest
         }
     }
 
+    @Test
+    public void testAutomaticUpgrade() throws Exception
+    {
+        for (String legacyVersion : legacyVersions)
+        {
+            logger.info("Loading legacy version: {}", legacyVersion);
+            truncateLegacyTables(legacyVersion);
+            loadLegacyTables(legacyVersion);
+            ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple", legacyVersion));
+            AbstractCompactionTask act = cfs.getCompactionStrategyManager().getNextBackgroundTask(0);
+            // there should be no compactions to run with auto upgrades disabled:
+            assertEquals(null, act);
+        }
+
+        DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(true);
+        for (String legacyVersion : legacyVersions)
+        {
+            logger.info("Loading legacy version: {}", legacyVersion);
+            truncateLegacyTables(legacyVersion);
+            loadLegacyTables(legacyVersion);
+            ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple", legacyVersion));
+            if (cfs.getLiveSSTables().stream().anyMatch(s -> !s.descriptor.version.isLatestVersion()))
+                assertTrue(cfs.metric.oldVersionSSTableCount.getValue() > 0);
+            while (cfs.getLiveSSTables().stream().anyMatch(s -> !s.descriptor.version.isLatestVersion()))
+            {
+                CompactionManager.instance.submitBackground(cfs);
+                Thread.sleep(100);
+            }
+            assertTrue(cfs.metric.oldVersionSSTableCount.getValue() == 0);
+        }
+        DatabaseDescriptor.setAutomaticSSTableUpgradeEnabled(false);
+    }
+
     private void streamLegacyTables(String legacyVersion) throws Exception
     {
             logger.info("Streaming legacy version {}", legacyVersion);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinterTest.java b/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinterTest.java
index 26b2ff6..32b0b62 100644
--- a/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinterTest.java
+++ b/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsPrinterTest.java
@@ -35,6 +35,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase
     public static final String expectedDefaultTable1Output =
         "\tTable: %s\n" +
         "\tSSTable count: 60000\n" +
+        "\tOld SSTable count: 0\n" +
         "\tSpace used (live): 0\n" +
         "\tSpace used (total): 9001\n" +
         "\tSpace used by snapshots (total): 1111\n" +
@@ -65,6 +66,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase
     public static final String expectedDefaultTable2Output =
         "\tTable: %s\n" +
         "\tSSTable count: 3000\n" +
+        "\tOld SSTable count: 0\n" +
         "\tSpace used (live): 22\n" +
         "\tSpace used (total): 1024\n" +
         "\tSpace used by snapshots (total): 222\n" +
@@ -100,6 +102,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase
     public static final String expectedDefaultTable3Output =
         "\tTable: %s\n" +
         "\tSSTable count: 50000\n" +
+        "\tOld SSTable count: 0\n" +
         "\tSpace used (live): 0\n" +
         "\tSpace used (total): 512\n" +
         "\tSpace used by snapshots (total): 0\n" +
@@ -130,6 +133,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase
     public static final String expectedDefaultTable4Output =
         "\tTable: %s\n" +
         "\tSSTable count: 2000\n" +
+        "\tOld SSTable count: 0\n" +
         "\tSpace used (live): 4444\n" +
         "\tSpace used (total): 256\n" +
         "\tSpace used by snapshots (total): 44\n" +
@@ -165,6 +169,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase
     public static final String expectedDefaultTable5Output =
         "\tTable: %s\n" +
         "\tSSTable count: 40000\n" +
+        "\tOld SSTable count: 0\n" +
         "\tSpace used (live): 55555\n" +
         "\tSpace used (total): 64\n" +
         "\tSpace used by snapshots (total): 55555\n" +
@@ -195,6 +200,7 @@ public class TableStatsPrinterTest extends TableStatsTestBase
     public static final String expectedDefaultTable6Output =
         "\tTable: %s\n" +
         "\tSSTable count: 1000\n" +
+        "\tOld SSTable count: 0\n" +
         "\tSpace used (live): 666666\n" +
         "\tSpace used (total): 0\n" +
         "\tSpace used by snapshots (total): 0\n" +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d14a9266/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsTestBase.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsTestBase.java b/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsTestBase.java
index bb56ef8..b2f1663 100644
--- a/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsTestBase.java
+++ b/test/unit/org/apache/cassandra/tools/nodetool/stats/TableStatsTestBase.java
@@ -70,6 +70,7 @@ public class TableStatsTestBase
         template.tableName = new String(tableName);
         template.isIndex = false;
         template.sstableCount = 0L;
+        template.oldSSTableCount = 0L;
         template.spaceUsedLive = "0";
         template.spaceUsedTotal = "0";
         template.spaceUsedBySnapshotsTotal = "0";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org