You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/08/10 09:20:44 UTC

[1/3] cassandra git commit: Add new JMX methods to change local compaction strategy

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 e389dc421 -> 929438b8b


Add new JMX methods to change local compaction strategy

Patch by marcuse; reviewed by iamaleksey for CASSANDRA-9965


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5aca7d79
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5aca7d79
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5aca7d79

Branch: refs/heads/cassandra-3.0
Commit: 5aca7d79aaf88f9c34dcae52f24bb62a28add91e
Parents: c8d163f
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Aug 4 20:31:25 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Aug 10 09:02:47 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        |  3 +-
 .../org/apache/cassandra/config/CFMetaData.java | 10 ++-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 35 +++++++++
 .../cassandra/db/ColumnFamilyStoreMBean.java    | 21 +++++
 .../compaction/AbstractCompactionStrategy.java  |  2 +-
 .../compaction/WrappingCompactionStrategy.java  | 51 +++++++++---
 .../db/compaction/CompactionsCQLTest.java       | 82 +++++++++++++++++++-
 8 files changed, 190 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5aca7d79/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7151883..462de44 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.9
+ * Add new JMX methods to change local compaction strategy (CASSANDRA-9965)
  * Write hints for paxos commits (CASSANDRA-7342)
  * (cqlsh) Fix timestamps before 1970 on Windows, always
    use UTC for timestamp display (CASSANDRA-10000)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5aca7d79/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 0b64e31..f6e2665 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -24,7 +24,8 @@ Upgrading
     - Commit log files are no longer recycled by default, due to negative
       performance implications. This can be enabled again with the 
       commitlog_segment_recycling option in your cassandra.yaml 
-
+    - JMX methods set/getCompactionStrategyClass have been deprecated, use
+      set/getLocalCompactionStrategy/set/getLocalCompactionStrategyJson instead
 
 2.1.8
 =====

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5aca7d79/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 4bc5f1b..2c6a30c 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -1283,7 +1283,9 @@ public final class CFMetaData
         return strategyClass;
     }
 
-    public AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore cfs)
+    public static AbstractCompactionStrategy createCompactionStrategyInstance(Class<? extends AbstractCompactionStrategy> compactionStrategyClass,
+                                                                              ColumnFamilyStore cfs,
+                                                                              Map<String, String> compactionStrategyOptions)
     {
         try
         {
@@ -1297,6 +1299,12 @@ public final class CFMetaData
         }
     }
 
+    @Deprecated
+    public AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore cfs)
+    {
+        return createCompactionStrategyInstance(compactionStrategyClass, cfs, compactionStrategyOptions);
+    }
+
     // converts CFM to thrift CfDef
     public org.apache.cassandra.thrift.CfDef toThrift()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5aca7d79/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 6777e7a..f8d796e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -252,6 +252,41 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         };
     }
 
+    public void setLocalCompactionStrategyJson(String options)
+    {
+        setLocalCompactionStrategy(FBUtilities.fromJsonMap(options));
+    }
+
+    public String getLocalCompactionStrategyJson()
+    {
+        return FBUtilities.json(getLocalCompactionStrategy());
+    }
+
+    public void setLocalCompactionStrategy(Map<String, String> options)
+    {
+        try
+        {
+            Map<String, String> optionsCopy = new HashMap<>(options);
+            Class<? extends AbstractCompactionStrategy> compactionStrategyClass = CFMetaData.createCompactionStrategy(optionsCopy.get("class"));
+            optionsCopy.remove("class");
+            CFMetaData.validateCompactionOptions(compactionStrategyClass, optionsCopy);
+            compactionStrategyWrapper.setNewLocalCompactionStrategy(compactionStrategyClass, optionsCopy);
+        }
+        catch (Throwable t)
+        {
+            logger.error("Could not set new local compaction strategy", t);
+            // dont propagate the ConfigurationException over jmx, user will only see a ClassNotFoundException
+            throw new IllegalArgumentException("Could not set new local compaction strategy: "+t.getMessage());
+        }
+    }
+
+    public Map<String, String> getLocalCompactionStrategy()
+    {
+        Map<String, String> options = new HashMap<>(compactionStrategyWrapper.options);
+        options.put("class", compactionStrategyWrapper.getName());
+        return options;
+    }
+
     public void setCompactionStrategyClass(String compactionStrategyClass)
     {
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5aca7d79/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index 4df593b..e292be4 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -311,14 +311,35 @@ public interface ColumnFamilyStoreMBean
     public void setMaximumCompactionThreshold(int threshold);
 
     /**
+     * Sets the compaction strategy locally for this node
+     *
+     * Note that this will be set until an ALTER with compaction = {..} is executed or the node is restarted
+     *
+     * @param options compaction options with the same syntax as when doing ALTER ... WITH compaction = {..}
+     */
+    public void setLocalCompactionStrategyJson(String options);
+    public String getLocalCompactionStrategyJson();
+
+    /**
+     * Sets the compaction strategy locally for this node
+     *
+     * Note that this will be set until an ALTER with compaction = {..} is executed or the node is restarted
+     *
+     * @param options compaction options map
+     */
+    public void setLocalCompactionStrategy(Map<String, String> options);
+    public Map<String, String> getLocalCompactionStrategy();
+    /**
      * Sets the compaction strategy by class name
      * @param className the name of the compaction strategy class
      */
+    @Deprecated
     public void setCompactionStrategyClass(String className);
 
     /**
      * Gets the compaction strategy class name
      */
+    @Deprecated
     public String getCompactionStrategyClass();
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5aca7d79/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 73cda77..77ca404 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -60,7 +60,7 @@ public abstract class AbstractCompactionStrategy
     protected static final String UNCHECKED_TOMBSTONE_COMPACTION_OPTION = "unchecked_tombstone_compaction";
     protected static final String COMPACTION_ENABLED = "enabled";
 
-    protected Map<String, String> options;
+    public Map<String, String> options;
 
     protected final ColumnFamilyStore cfs;
     protected float tombstoneThreshold;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5aca7d79/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
index 0fed733..ae67599 100644
--- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
@@ -47,6 +48,16 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy
     private static final Logger logger = LoggerFactory.getLogger(WrappingCompactionStrategy.class);
     private volatile AbstractCompactionStrategy repaired;
     private volatile AbstractCompactionStrategy unrepaired;
+    /*
+        We keep a copy of the schema compaction options and class here to be able to decide if we
+        should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER.
+
+        If a user changes the local compaction strategy and then later ALTERs a compaction option,
+        we will use the new compaction options.
+     */
+    private Map<String, String> schemaCompactionOptions;
+    private Class<?> schemaCompactionStrategyClass;
+
     public WrappingCompactionStrategy(ColumnFamilyStore cfs)
     {
         super(cfs, cfs.metadata.compactionStrategyOptions);
@@ -146,10 +157,9 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy
 
     public synchronized void maybeReloadCompactionStrategy(CFMetaData metadata)
     {
-        if (repaired != null && repaired.getClass().equals(metadata.compactionStrategyClass)
-            && unrepaired != null && unrepaired.getClass().equals(metadata.compactionStrategyClass)
-            && repaired.options.equals(metadata.compactionStrategyOptions)
-            && unrepaired.options.equals(metadata.compactionStrategyOptions))
+        // compare the old schema configuration to the new one, ignore any locally set changes.
+        if (metadata.compactionStrategyClass.equals(schemaCompactionStrategyClass) &&
+            metadata.compactionStrategyOptions.equals(schemaCompactionOptions))
             return;
         reloadCompactionStrategy(metadata);
     }
@@ -157,13 +167,10 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy
     public synchronized void reloadCompactionStrategy(CFMetaData metadata)
     {
         boolean disabledWithJMX = !enabled && shouldBeEnabled();
-        if (repaired != null)
-            repaired.shutdown();
-        if (unrepaired != null)
-            unrepaired.shutdown();
-        repaired = metadata.createCompactionStrategyInstance(cfs);
-        unrepaired = metadata.createCompactionStrategyInstance(cfs);
-        options = ImmutableMap.copyOf(metadata.compactionStrategyOptions);
+        setStrategy(metadata.compactionStrategyClass, metadata.compactionStrategyOptions);
+        schemaCompactionOptions = ImmutableMap.copyOf(metadata.compactionStrategyOptions);
+        schemaCompactionStrategyClass = repaired.getClass();
+
         if (disabledWithJMX || !shouldBeEnabled())
             disable();
         else
@@ -393,4 +400,26 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy
     {
         return Arrays.asList(repaired, unrepaired);
     }
+
+    public synchronized void setNewLocalCompactionStrategy(Class<? extends AbstractCompactionStrategy> compactionStrategyClass, Map<String, String> options)
+    {
+        logger.info("Switching local compaction strategy from {} to {} with options={}", repaired == null ? "null" : repaired.getClass(), compactionStrategyClass, options);
+        setStrategy(compactionStrategyClass, options);
+        if (shouldBeEnabled())
+            enable();
+        else
+            disable();
+        startup();
+    }
+
+    private void setStrategy(Class<? extends AbstractCompactionStrategy> compactionStrategyClass, Map<String, String> options)
+    {
+        if (repaired != null)
+            repaired.shutdown();
+        if (unrepaired != null)
+            unrepaired.shutdown();
+        repaired = CFMetaData.createCompactionStrategyInstance(compactionStrategyClass, cfs, options);
+        unrepaired = CFMetaData.createCompactionStrategyInstance(compactionStrategyClass, cfs, options);
+        this.options = ImmutableMap.copyOf(options);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5aca7d79/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 58fc062..2798689 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -17,12 +17,16 @@
  */
 package org.apache.cassandra.db.compaction;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.junit.Test;
 
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -141,12 +145,88 @@ public class CompactionsCQLTest extends CQLTester
         assertTrue(minorWasTriggered(KEYSPACE, currentTable()));
     }
 
+    @Test
+    public void testSetLocalCompactionStrategy() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+        Map<String, String> localOptions = new HashMap<>();
+        localOptions.put("class", "DateTieredCompactionStrategy");
+        getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
+        WrappingCompactionStrategy wrappingCompactionStrategy = (WrappingCompactionStrategy) getCurrentColumnFamilyStore().getCompactionStrategy();
+        assertTrue(verifyStrategies(wrappingCompactionStrategy, DateTieredCompactionStrategy.class));
+        // altering something non-compaction related
+        execute("ALTER TABLE %s WITH gc_grace_seconds = 1000");
+        // should keep the local compaction strat
+        assertTrue(verifyStrategies(wrappingCompactionStrategy, DateTieredCompactionStrategy.class));
+        // altering a compaction option
+        execute("ALTER TABLE %s WITH compaction = {'class':'SizeTieredCompactionStrategy', 'min_threshold':3}");
+        // will use the new option
+        assertTrue(verifyStrategies(wrappingCompactionStrategy, SizeTieredCompactionStrategy.class));
+    }
+
+
+    @Test
+    public void testSetLocalCompactionStrategyDisable() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+        Map<String, String> localOptions = new HashMap<>();
+        localOptions.put("class", "DateTieredCompactionStrategy");
+        localOptions.put("enabled", "false");
+        getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
+        assertFalse(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
+        localOptions.clear();
+        localOptions.put("class", "DateTieredCompactionStrategy");
+        // localOptions.put("enabled", "true"); - this is default!
+        getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
+        assertTrue(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
+    }
+
+
+    @Test
+    public void testSetLocalCompactionStrategyEnable() throws Throwable
+    {
+        createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+        Map<String, String> localOptions = new HashMap<>();
+        localOptions.put("class", "DateTieredCompactionStrategy");
+
+        getCurrentColumnFamilyStore().disableAutoCompaction();
+        assertFalse(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
+
+        getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
+        assertTrue(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
+
+    }
+
+
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBadLocalCompactionStrategyOptions()
+    {
+        createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+        Map<String, String> localOptions = new HashMap<>();
+        localOptions.put("class","SizeTieredCompactionStrategy");
+        localOptions.put("sstable_size_in_mb","1234"); // not for STCS
+        getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
+    }
+
+    public boolean verifyStrategies(WrappingCompactionStrategy wrappingStrategy, Class<? extends AbstractCompactionStrategy> expected)
+    {
+        boolean found = false;
+        for (AbstractCompactionStrategy actualStrategy : wrappingStrategy.getWrappedStrategies())
+        {
+            if (!actualStrategy.getClass().equals(expected))
+                return false;
+            found = true;
+        }
+        return found;
+    }
+
     private ColumnFamilyStore getCurrentColumnFamilyStore()
     {
         return Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable());
     }
 
-    public boolean minorWasTriggered(String keyspace, String cf) throws Throwable
+    private boolean minorWasTriggered(String keyspace, String cf) throws Throwable
     {
         UntypedResultSet res = execute("SELECT * FROM system.compaction_history");
         boolean minorWasTriggered = false;


[2/3] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/config/CFMetaData.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9c3b967e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9c3b967e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9c3b967e

Branch: refs/heads/cassandra-3.0
Commit: 9c3b967e7186c1c3b6f1c25c627e770187020344
Parents: 6d0cf7d 5aca7d7
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Aug 10 09:08:39 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Aug 10 09:08:39 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        |  3 +-
 .../org/apache/cassandra/config/CFMetaData.java | 10 ++-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 35 +++++++++
 .../cassandra/db/ColumnFamilyStoreMBean.java    | 21 +++++
 .../compaction/AbstractCompactionStrategy.java  |  2 +-
 .../compaction/WrappingCompactionStrategy.java  | 51 +++++++++---
 .../db/compaction/CompactionsCQLTest.java       | 82 +++++++++++++++++++-
 8 files changed, 190 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3b967e/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a913fe7,462de44..772455c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,10 -1,5 +1,11 @@@
 -2.1.9
 +2.2.1
 + * Add checksum to saved cache files (CASSANDRA-9265)
 + * Log warning when using an aggregate without partition key (CASSANDRA-9737)
 + * Avoid grouping sstables for anticompaction with DTCS (CASSANDRA-9900)
 + * UDF / UDA execution time in trace (CASSANDRA-9723)
 + * Fix broken internode SSL (CASSANDRA-9884)
 +Merged from 2.1:
+  * Add new JMX methods to change local compaction strategy (CASSANDRA-9965)
   * Write hints for paxos commits (CASSANDRA-7342)
   * (cqlsh) Fix timestamps before 1970 on Windows, always
     use UTC for timestamp display (CASSANDRA-10000)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3b967e/NEWS.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3b967e/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/CFMetaData.java
index d8eeaf2,2c6a30c..6468973
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@@ -876,6 -1299,57 +878,12 @@@ public final class CFMetaDat
          }
      }
  
+     @Deprecated
+     public AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore cfs)
+     {
+         return createCompactionStrategyInstance(compactionStrategyClass, cfs, compactionStrategyOptions);
+     }
+ 
 -    // converts CFM to thrift CfDef
 -    public org.apache.cassandra.thrift.CfDef toThrift()
 -    {
 -        org.apache.cassandra.thrift.CfDef def = new org.apache.cassandra.thrift.CfDef(ksName, cfName);
 -        def.setColumn_type(cfType.name());
 -
 -        if (isSuper())
 -        {
 -            def.setComparator_type(comparator.subtype(0).toString());
 -            def.setSubcomparator_type(comparator.subtype(1).toString());
 -        }
 -        else
 -        {
 -            def.setComparator_type(comparator.toString());
 -        }
 -
 -        def.setComment(Strings.nullToEmpty(comment));
 -        def.setRead_repair_chance(readRepairChance);
 -        def.setDclocal_read_repair_chance(dcLocalReadRepairChance);
 -        def.setGc_grace_seconds(gcGraceSeconds);
 -        def.setDefault_validation_class(defaultValidator == null ? null : defaultValidator.toString());
 -        def.setKey_validation_class(keyValidator.toString());
 -        def.setMin_compaction_threshold(minCompactionThreshold);
 -        def.setMax_compaction_threshold(maxCompactionThreshold);
 -        // We only return the alias if only one is set since thrift don't know about multiple key aliases
 -        if (partitionKeyColumns.size() == 1)
 -            def.setKey_alias(partitionKeyColumns.get(0).name.bytes);
 -        def.setColumn_metadata(ColumnDefinition.toThrift(columnMetadata));
 -        def.setCompaction_strategy(compactionStrategyClass.getName());
 -        def.setCompaction_strategy_options(new HashMap<>(compactionStrategyOptions));
 -        def.setCompression_options(compressionParameters.asThriftOptions());
 -        if (bloomFilterFpChance != null)
 -            def.setBloom_filter_fp_chance(bloomFilterFpChance);
 -        def.setMin_index_interval(minIndexInterval);
 -        def.setMax_index_interval(maxIndexInterval);
 -        def.setMemtable_flush_period_in_ms(memtableFlushPeriod);
 -        def.setCaching(caching.toThriftCaching());
 -        def.setCells_per_row_to_cache(caching.toThriftCellsPerRow());
 -        def.setDefault_time_to_live(defaultTimeToLive);
 -        def.setSpeculative_retry(speculativeRetry.toString());
 -        def.setTriggers(TriggerDefinition.toThrift(triggers));
 -
 -        return def;
 -    }
 -
      /**
       * Returns the ColumnDefinition for {@code name}.
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3b967e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3b967e/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3b967e/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3b967e/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
----------------------------------------------------------------------


[3/3] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0

Conflicts:
	src/java/org/apache/cassandra/config/CFMetaData.java
	src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
	test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/929438b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/929438b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/929438b8

Branch: refs/heads/cassandra-3.0
Commit: 929438b8be32e38f6d921bfdc4a011cd526dfeb7
Parents: e389dc4 9c3b967
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Aug 10 09:12:52 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Aug 10 09:13:01 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        |  3 +-
 .../org/apache/cassandra/config/CFMetaData.java |  7 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 29 ++++++-
 .../cassandra/db/ColumnFamilyStoreMBean.java    | 19 +++--
 .../compaction/CompactionStrategyManager.java   | 51 +++++++++---
 .../db/compaction/CompactionsCQLTest.java       | 81 +++++++++++++++++++-
 7 files changed, 166 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7cb9f16,772455c..639dd59
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -19,39 -13,6 +19,40 @@@ Merged from 2.1
     when both exist (CASSANDRA-9777)
   * Release snapshot selfRef when doing snapshot repair (CASSANDRA-9998)
   * Cannot replace token does not exist - DN node removed as Fat Client (CASSANDRA-9871)
 +Merged from 2.0:
 + * Don't cast expected bf size to an int (CASSANDRA-9959)
 +
 +
 +3.0.0-alpha1
 + * Implement proper sandboxing for UDFs (CASSANDRA-9402)
 + * Simplify (and unify) cleanup of compaction leftovers (CASSANDRA-7066)
 + * Allow extra schema definitions in cassandra-stress yaml (CASSANDRA-9850)
 + * Metrics should use up to date nomenclature (CASSANDRA-9448)
 + * Change CREATE/ALTER TABLE syntax for compression (CASSANDRA-8384)
 + * Cleanup crc and adler code for java 8 (CASSANDRA-9650)
 + * Storage engine refactor (CASSANDRA-8099, 9743, 9746, 9759, 9781, 9808, 9825,
 +   9848, 9705, 9859, 9867, 9874, 9828, 9801)
 + * Update Guava to 18.0 (CASSANDRA-9653)
 + * Bloom filter false positive ratio is not honoured (CASSANDRA-8413)
 + * New option for cassandra-stress to leave a ratio of columns null (CASSANDRA-9522)
 + * Change hinted_handoff_enabled yaml setting, JMX (CASSANDRA-9035)
 + * Add algorithmic token allocation (CASSANDRA-7032)
 + * Add nodetool command to replay batchlog (CASSANDRA-9547)
 + * Make file buffer cache independent of paths being read (CASSANDRA-8897)
 + * Remove deprecated legacy Hadoop code (CASSANDRA-9353)
 + * Decommissioned nodes will not rejoin the cluster (CASSANDRA-8801)
 + * Change gossip stabilization to use endpoit size (CASSANDRA-9401)
 + * Change default garbage collector to G1 (CASSANDRA-7486)
 + * Populate TokenMetadata early during startup (CASSANDRA-9317)
 + * Undeprecate cache recentHitRate (CASSANDRA-6591)
 + * Add support for selectively varint encoding fields (CASSANDRA-9499, 9865)
 + * Materialized Views (CASSANDRA-6477)
 +Merged from 2.2:
 + * Avoid grouping sstables for anticompaction with DTCS (CASSANDRA-9900)
 + * UDF / UDA execution time in trace (CASSANDRA-9723)
 + * Fix broken internode SSL (CASSANDRA-9884)
 +Merged from 2.1:
++ * Add new JMX methods to change local compaction strategy (CASSANDRA-9965)
   * Fix handling of enable/disable autocompaction (CASSANDRA-9899)
   * Add consistency level to tracing ouput (CASSANDRA-9827)
   * Remove repair snapshot leftover on startup (CASSANDRA-7357)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 0fa4ded,ccc5cc8..5a690bd
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -13,55 -13,6 +13,56 @@@ restore snapshots created with the prev
  'sstableloader' tool. You can upgrade the file format of your snapshots
  using the provided 'sstableupgrade' tool.
  
 +3.0
 +===
 +
 +New features
 +------------
 +   - Materialized Views, which allow for server-side denormalization, is now
 +     available. Materialized views provide an alternative to secondary indexes
 +     for non-primary key queries, and perform much better for indexing high
 +     cardinality columns.
 +     See http://www.datastax.com/dev/blog/new-in-cassandra-3-0-materialized-views
 +
 +
 +Upgrading
 +---------
 +   - 3.0 requires Java 8u20 or later.
 +   - The default JVM GC has been changed to G1GC.
 +   - The default JVM flag -XX:+PerfDisableSharedMem will cause the following tools JVM
 +     to stop working: jps, jstack, jinfo, jmc, jcmd as well as 3rd party tools like Jolokia.
 +     If you wish to use these tools you can comment this flag out in cassandra-env.{sh,ps1}
 +   - New write stages have been added for batchlog and materialized view mutations
 +     you can set their size in cassandra.yaml
 +   - User defined functions are now executed in a sandbox.
 +     To use UDFs and UDAs, you have to enable them in cassandra.yaml.
 +   - New SSTable version 'la' with improved bloom-filter false-positive handling
 +     compared to previous version 'ka' used in 2.2 and 2.1. Running sstableupgrade
 +     is not necessary but recommended.
 +   - Before upgrading to 3.0, make sure that your cluster is in complete agreement
 +     (schema versions outputted by `nodetool describecluster` are all the same).
 +   - Schema metadata is now stored in the new `system_schema` keyspace, and
 +     legacy `system.schema_*` tables are now gone; see CASSANDRA-6717 for details.
 +   - Pig's CassandraStorage has been removed. Use CqlNativeStorage instead.
 +   - Hadoop BulkOutputFormat and BulkRecordWriter have been removed; use
 +     CqlBulkOutputFormat and CqlBulkRecordWriter instead.
 +   - Hadoop ColumnFamilyInputFormat and ColumnFamilyOutputFormat have been removed;
 +     use CqlInputFormat and CqlOutputFormat instead.
 +   - Hadoop ColumnFamilyRecordReader and ColumnFamilyRecordWriter have been removed;
 +     use CqlRecordReader and CqlRecordWriter instead.
 +   - hinted_handoff_enabled in cassandra.yaml no longer supports a list of data centers.
 +     To specify a list of excluded data centers when hinted_handoff_enabled is set to true,
 +     use hinted_handoff_disabled_datacenters, see CASSANDRA-9035 for details.
 +   - The `sstable_compression` and `chunk_length_kb` compression options have been deprecated.
 +     The new options are `class` and `chunk_length_in_kb`. Disabling compression should now
 +     be done by setting the new option `enabled` to `false`.
 +   - Only map syntax is now allowed for caching options. ALL/NONE/KEYS_ONLY/ROWS_ONLY syntax
 +     has been deprecated since 2.1.0 and is being removed in 3.0.0.
 +   - Batchlog entries are now stored in a new table - system.batches.
 +     The old one has been deprecated.
- 
++   - JMX methods set/getCompactionStrategyClass have been removed, use
++     set/getLocalCompactionStrategy or set/getLocalCompactionStrategyJson instead.
 +
  2.2
  ===
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/CFMetaData.java
index 1d38274,6468973..7719587
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@@ -772,25 -819,60 +772,26 @@@ public final class CFMetaDat
              throw new ConfigurationException(String.format("Column family comparators do not match or are not compatible (found %s; expected %s).", cfm.comparator.getClass().getSimpleName(), comparator.getClass().getSimpleName()));
      }
  
 -    public static void validateCompactionOptions(Class<? extends AbstractCompactionStrategy> strategyClass, Map<String, String> options) throws ConfigurationException
 +    public static Class<? extends AbstractCompactionStrategy> createCompactionStrategy(String className) throws ConfigurationException
 +    {
 +        className = className.contains(".") ? className : "org.apache.cassandra.db.compaction." + className;
 +        Class<AbstractCompactionStrategy> strategyClass = FBUtilities.classForName(className, "compaction strategy");
 +        if (!AbstractCompactionStrategy.class.isAssignableFrom(strategyClass))
 +            throw new ConfigurationException(String.format("Specified compaction strategy class (%s) is not derived from AbstractReplicationStrategy", className));
 +
 +        return strategyClass;
 +    }
 +
-     public AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore cfs)
++    public static AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore cfs,
++                                                                              CompactionParams compactionParams)
      {
          try
          {
 -            if (options == null)
 -                return;
 -
 -            Map<?,?> unknownOptions = (Map) strategyClass.getMethod("validateOptions", Map.class).invoke(null, options);
 -            if (!unknownOptions.isEmpty())
 -                throw new ConfigurationException(String.format("Properties specified %s are not understood by %s", unknownOptions.keySet(), strategyClass.getSimpleName()));
 +            Constructor<? extends AbstractCompactionStrategy> constructor =
-                 params.compaction.klass().getConstructor(ColumnFamilyStore.class, Map.class);
-             return constructor.newInstance(cfs, params.compaction.options());
++                compactionParams.klass().getConstructor(ColumnFamilyStore.class, Map.class);
++            return constructor.newInstance(cfs, compactionParams.options());
          }
 -        catch (NoSuchMethodException e)
 -        {
 -            logger.warn("Compaction Strategy {} does not have a static validateOptions method. Validation ignored", strategyClass.getName());
 -        }
 -        catch (InvocationTargetException e)
 -        {
 -            if (e.getTargetException() instanceof ConfigurationException)
 -                throw (ConfigurationException) e.getTargetException();
 -            throw new ConfigurationException("Failed to validate compaction options: " + options);
 -        }
 -        catch (ConfigurationException e)
 -        {
 -            throw e;
 -        }
 -        catch (Exception e)
 -        {
 -            throw new ConfigurationException("Failed to validate compaction options: " + options);
 -        }
 -    }
 -
 -    public static Class<? extends AbstractCompactionStrategy> createCompactionStrategy(String className) throws ConfigurationException
 -    {
 -        className = className.contains(".") ? className : "org.apache.cassandra.db.compaction." + className;
 -        Class<AbstractCompactionStrategy> strategyClass = FBUtilities.classForName(className, "compaction strategy");
 -        if (className.equals(WrappingCompactionStrategy.class.getName()))
 -            throw new ConfigurationException("You can't set WrappingCompactionStrategy as the compaction strategy!");
 -        if (!AbstractCompactionStrategy.class.isAssignableFrom(strategyClass))
 -            throw new ConfigurationException(String.format("Specified compaction strategy class (%s) is not derived from AbstractReplicationStrategy", className));
 -
 -        return strategyClass;
 -    }
 -
 -    public static AbstractCompactionStrategy createCompactionStrategyInstance(Class<? extends AbstractCompactionStrategy> compactionStrategyClass,
 -                                                                              ColumnFamilyStore cfs,
 -                                                                              Map<String, String> compactionStrategyOptions)
 -    {
 -        try
 -        {
 -            Constructor<? extends AbstractCompactionStrategy> constructor =
 -                compactionStrategyClass.getConstructor(ColumnFamilyStore.class, Map.class);
 -            return constructor.newInstance(cfs, compactionStrategyOptions);
 -        }
 -        catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e)
 +        catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e)
          {
              throw new RuntimeException(e);
          }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index beb2b93,6b71be9..4ae6694
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -248,14 -259,57 +248,35 @@@ public class ColumnFamilyStore implemen
          };
      }
  
-     public void setCompactionStrategyClass(String compactionStrategyClass)
+     public void setLocalCompactionStrategyJson(String options)
      {
-         throw new UnsupportedOperationException("ColumnFamilyStore.setCompactionStrategyClass() method is no longer supported");
+         setLocalCompactionStrategy(FBUtilities.fromJsonMap(options));
      }
  
-     public String getCompactionStrategyClass()
+     public String getLocalCompactionStrategyJson()
      {
-         return metadata.params.compaction.klass().getName();
+         return FBUtilities.json(getLocalCompactionStrategy());
+     }
+ 
+     public void setLocalCompactionStrategy(Map<String, String> options)
+     {
+         try
+         {
 -            Map<String, String> optionsCopy = new HashMap<>(options);
 -            Class<? extends AbstractCompactionStrategy> compactionStrategyClass = CFMetaData.createCompactionStrategy(optionsCopy.get("class"));
 -            optionsCopy.remove("class");
 -            CFMetaData.validateCompactionOptions(compactionStrategyClass, optionsCopy);
 -            compactionStrategyWrapper.setNewLocalCompactionStrategy(compactionStrategyClass, optionsCopy);
++            CompactionParams compactionParams = CompactionParams.fromMap(options);
++            compactionParams.validate();
++            compactionStrategyManager.setNewLocalCompactionStrategy(compactionParams);
+         }
+         catch (Throwable t)
+         {
+             logger.error("Could not set new local compaction strategy", t);
+             // dont propagate the ConfigurationException over jmx, user will only see a ClassNotFoundException
+             throw new IllegalArgumentException("Could not set new local compaction strategy: "+t.getMessage());
+         }
+     }
+ 
+     public Map<String, String> getLocalCompactionStrategy()
+     {
 -        Map<String, String> options = new HashMap<>(compactionStrategyWrapper.options);
 -        options.put("class", compactionStrategyWrapper.getName());
 -        return options;
 -    }
 -
 -    public void setCompactionStrategyClass(String compactionStrategyClass)
 -    {
 -        try
 -        {
 -            metadata.compactionStrategyClass = CFMetaData.createCompactionStrategy(compactionStrategyClass);
 -            compactionStrategyWrapper.maybeReloadCompactionStrategy(metadata);
 -        }
 -        catch (ConfigurationException e)
 -        {
 -            throw new IllegalArgumentException(e.getMessage());
 -        }
 -    }
 -
 -    public String getCompactionStrategyClass()
 -    {
 -        return metadata.compactionStrategyClass.getName();
++        return compactionStrategyManager.getCompactionParams().asMap();
      }
  
      public Map<String,String> getCompressionParameters()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index c23df74,1a8ba1d..84c6dd1
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@@ -70,15 -67,36 +70,24 @@@ public interface ColumnFamilyStoreMBea
      public void setMaximumCompactionThreshold(int threshold);
  
      /**
-      * Sets the compaction strategy by class name
-      * @param className the name of the compaction strategy class
+      * Sets the compaction strategy locally for this node
+      *
+      * Note that this will be set until an ALTER with compaction = {..} is executed or the node is restarted
+      *
+      * @param options compaction options with the same syntax as when doing ALTER ... WITH compaction = {..}
       */
-     public void setCompactionStrategyClass(String className);
+     public void setLocalCompactionStrategyJson(String options);
+     public String getLocalCompactionStrategyJson();
  
      /**
-      * Gets the compaction strategy class name
+      * Sets the compaction strategy locally for this node
+      *
+      * Note that this will be set until an ALTER with compaction = {..} is executed or the node is restarted
+      *
+      * @param options compaction options map
       */
-     public String getCompactionStrategyClass();
+     public void setLocalCompactionStrategy(Map<String, String> options);
+     public Map<String, String> getLocalCompactionStrategy();
 -    /**
 -     * Sets the compaction strategy by class name
 -     * @param className the name of the compaction strategy class
 -     */
 -    @Deprecated
 -    public void setCompactionStrategyClass(String className);
 -
 -    /**
 -     * Gets the compaction strategy class name
 -     */
 -    @Deprecated
 -    public String getCompactionStrategyClass();
  
      /**
       * Get the compression parameters

http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 4f6dfa2,0000000..7204da0
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -1,453 -1,0 +1,482 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.compaction;
 +
 +
 +import java.util.*;
 +import java.util.concurrent.Callable;
 +
 +import com.google.common.collect.Iterables;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Memtable;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.notifications.*;
 +import org.apache.cassandra.schema.CompactionParams;
 +
 +/**
 + * Manages the compaction strategies.
 + *
 + * Currently has two instances of actual compaction strategies - one for repaired data and one for
 + * unrepaired data. This is done to be able to totally separate the different sets of sstables.
 + */
 +public class CompactionStrategyManager implements INotificationConsumer
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
 +    private final ColumnFamilyStore cfs;
 +    private volatile AbstractCompactionStrategy repaired;
 +    private volatile AbstractCompactionStrategy unrepaired;
 +    private volatile boolean enabled = true;
 +    public boolean isActive = true;
 +    private volatile CompactionParams params;
++    /*
++        We keep a copy of the schema compaction parameters here to be able to decide if we
++        should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER.
++
++        If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
++        we will use the new compaction parameters.
++     */
++    private CompactionParams schemaCompactionParams;
 +
 +    public CompactionStrategyManager(ColumnFamilyStore cfs)
 +    {
 +        cfs.getTracker().subscribe(this);
 +        logger.debug("{} subscribed to the data tracker.", this);
 +        this.cfs = cfs;
 +        reload(cfs.metadata);
 +        params = cfs.metadata.params.compaction;
 +        enabled = params.isEnabled();
 +    }
 +
 +    /**
 +     * Return the next background task
 +     *
 +     * Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks)
 +     *
 +     */
 +    public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
 +    {
 +        if (!isEnabled())
 +            return null;
 +
 +        maybeReload(cfs.metadata);
 +
 +        if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
 +        {
 +            AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
 +            if (repairedTask != null)
 +                return repairedTask;
 +            return unrepaired.getNextBackgroundTask(gcBefore);
 +        }
 +        else
 +        {
 +            AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
 +            if (unrepairedTask != null)
 +                return unrepairedTask;
 +            return repaired.getNextBackgroundTask(gcBefore);
 +        }
 +    }
 +
 +    public boolean isEnabled()
 +    {
 +        return enabled && isActive;
 +    }
 +
 +    public synchronized void resume()
 +    {
 +        isActive = true;
 +    }
 +
 +    /**
 +     * pause compaction while we cancel all ongoing compactions
 +     *
 +     * Separate call from enable/disable to not have to save the enabled-state externally
 +      */
 +    public synchronized void pause()
 +    {
 +        isActive = false;
 +    }
 +
 +
 +    private void startup()
 +    {
 +        for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
 +        {
 +            if (sstable.openReason != SSTableReader.OpenReason.EARLY)
 +                getCompactionStrategyFor(sstable).addSSTable(sstable);
 +        }
 +        repaired.startup();
 +        unrepaired.startup();
 +    }
 +
 +    /**
 +     * return the compaction strategy for the given sstable
 +     *
 +     * returns differently based on the repaired status
 +     * @param sstable
 +     * @return
 +     */
 +    private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
 +    {
 +        if (sstable.isRepaired())
 +            return repaired;
 +        else
 +            return unrepaired;
 +    }
 +
 +    public void shutdown()
 +    {
 +        isActive = false;
 +        repaired.shutdown();
 +        unrepaired.shutdown();
 +    }
 +
 +    public synchronized void maybeReload(CFMetaData metadata)
 +    {
-         if (repaired != null && repaired.getClass().equals(metadata.params.compaction.klass())
-                 && unrepaired != null && unrepaired.getClass().equals(metadata.params.compaction.klass())
-                 && repaired.options.equals(metadata.params.compaction.options()) // todo: assumes all have the same options
-                 && unrepaired.options.equals(metadata.params.compaction.options()))
++        // compare the old schema configuration to the new one, ignore any locally set changes.
++        if (metadata.params.compaction.equals(schemaCompactionParams))
 +            return;
 +        reload(metadata);
 +    }
 +
 +    /**
 +     * Reload the compaction strategies
 +     *
 +     * Called after changing configuration and at startup.
 +     * @param metadata
 +     */
 +    public synchronized void reload(CFMetaData metadata)
 +    {
 +        boolean disabledWithJMX = !enabled && shouldBeEnabled();
-         if (repaired != null)
-             repaired.shutdown();
-         if (unrepaired != null)
-             unrepaired.shutdown();
-         repaired = metadata.createCompactionStrategyInstance(cfs);
-         unrepaired = metadata.createCompactionStrategyInstance(cfs);
-         params = metadata.params.compaction;
++        setStrategy(metadata.params.compaction);
++        schemaCompactionParams = metadata.params.compaction;
++
 +        if (disabledWithJMX || !shouldBeEnabled())
 +            disable();
 +        else
 +            enable();
 +        startup();
 +    }
 +
 +    public void replaceFlushed(Memtable memtable, SSTableReader sstable)
 +    {
 +        cfs.getTracker().replaceFlushed(memtable, sstable);
 +        if (sstable != null)
 +            CompactionManager.instance.submitBackground(cfs);
 +    }
 +
 +    public int getUnleveledSSTables()
 +    {
 +        if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
 +        {
 +            int count = 0;
 +            count += ((LeveledCompactionStrategy)repaired).getLevelSize(0);
 +            count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
 +            return count;
 +        }
 +        return 0;
 +    }
 +
 +    public synchronized int[] getSSTableCountPerLevel()
 +    {
 +        if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
 +        {
 +            int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
 +            int[] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
 +            res = sumArrays(res, repairedCountPerLevel);
 +            int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
 +            res = sumArrays(res, unrepairedCountPerLevel);
 +            return res;
 +        }
 +        return null;
 +    }
 +
 +    private static int[] sumArrays(int[] a, int[] b)
 +    {
 +        int[] res = new int[Math.max(a.length, b.length)];
 +        for (int i = 0; i < res.length; i++)
 +        {
 +            if (i < a.length && i < b.length)
 +                res[i] = a[i] + b[i];
 +            else if (i < a.length)
 +                res[i] = a[i];
 +            else
 +                res[i] = b[i];
 +        }
 +        return res;
 +    }
 +
 +    public boolean shouldDefragment()
 +    {
 +        assert repaired.getClass().equals(unrepaired.getClass());
 +        return repaired.shouldDefragment();
 +    }
 +
 +
 +    public synchronized void handleNotification(INotification notification, Object sender)
 +    {
 +        if (notification instanceof SSTableAddedNotification)
 +        {
 +            SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
 +            if (flushedNotification.added.isRepaired())
 +                repaired.addSSTable(flushedNotification.added);
 +            else
 +                unrepaired.addSSTable(flushedNotification.added);
 +        }
 +        else if (notification instanceof SSTableListChangedNotification)
 +        {
 +            SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
 +            Set<SSTableReader> repairedRemoved = new HashSet<>();
 +            Set<SSTableReader> repairedAdded = new HashSet<>();
 +            Set<SSTableReader> unrepairedRemoved = new HashSet<>();
 +            Set<SSTableReader> unrepairedAdded = new HashSet<>();
 +
 +            for (SSTableReader sstable : listChangedNotification.removed)
 +            {
 +                if (sstable.isRepaired())
 +                    repairedRemoved.add(sstable);
 +                else
 +                    unrepairedRemoved.add(sstable);
 +            }
 +            for (SSTableReader sstable : listChangedNotification.added)
 +            {
 +                if (sstable.isRepaired())
 +                    repairedAdded.add(sstable);
 +                else
 +                    unrepairedAdded.add(sstable);
 +            }
 +            if (!repairedRemoved.isEmpty())
 +            {
 +                repaired.replaceSSTables(repairedRemoved, repairedAdded);
 +            }
 +            else
 +            {
 +                for (SSTableReader sstable : repairedAdded)
 +                    repaired.addSSTable(sstable);
 +            }
 +
 +            if (!unrepairedRemoved.isEmpty())
 +            {
 +                unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded);
 +            }
 +            else
 +            {
 +                for (SSTableReader sstable : unrepairedAdded)
 +                    unrepaired.addSSTable(sstable);
 +            }
 +        }
 +        else if (notification instanceof SSTableRepairStatusChanged)
 +        {
 +            for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
 +            {
 +                if (sstable.isRepaired())
 +                {
 +                    unrepaired.removeSSTable(sstable);
 +                    repaired.addSSTable(sstable);
 +                }
 +                else
 +                {
 +                    repaired.removeSSTable(sstable);
 +                    unrepaired.addSSTable(sstable);
 +                }
 +            }
 +        }
 +        else if (notification instanceof SSTableDeletingNotification)
 +        {
 +            SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
 +            if (sstable.isRepaired())
 +                repaired.removeSSTable(sstable);
 +            else
 +                unrepaired.removeSSTable(sstable);
 +        }
 +    }
 +
 +    public void enable()
 +    {
 +        if (repaired != null)
 +            repaired.enable();
 +        if (unrepaired != null)
 +            unrepaired.enable();
 +        // enable this last to make sure the strategies are ready to get calls.
 +        enabled = true;
 +    }
 +
 +    public void disable()
 +    {
 +        // disable this first avoid asking disabled strategies for compaction tasks
 +        enabled = false;
 +        if (repaired != null)
 +            repaired.disable();
 +        if (unrepaired != null)
 +            unrepaired.disable();
 +    }
 +
 +    /**
 +     * Create ISSTableScanner from the given sstables
 +     *
 +     * Delegates the call to the compaction strategies to allow LCS to create a scanner
 +     * @param sstables
 +     * @param range
 +     * @return
 +     */
 +    @SuppressWarnings("resource")
 +    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> ranges)
 +    {
 +        List<SSTableReader> repairedSSTables = new ArrayList<>();
 +        List<SSTableReader> unrepairedSSTables = new ArrayList<>();
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (sstable.isRepaired())
 +                repairedSSTables.add(sstable);
 +            else
 +                unrepairedSSTables.add(sstable);
 +        }
 +
 +        Set<ISSTableScanner> scanners = new HashSet<>(sstables.size());
 +
 +        for (Range<Token> range : ranges)
 +        {
 +            AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
 +            AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
 +
 +            for (ISSTableScanner scanner : Iterables.concat(repairedScanners.scanners, unrepairedScanners.scanners))
 +            {
 +                if (!scanners.add(scanner))
 +                    scanner.close();
 +            }
 +        }
 +
 +        return new AbstractCompactionStrategy.ScannerList(new ArrayList<>(scanners));
 +    }
 +
 +    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
 +    {
 +        return getScanners(sstables, Collections.singleton(null));
 +    }
 +
 +    public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
 +    {
 +        return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
 +    }
 +
 +    public long getMaxSSTableBytes()
 +    {
 +        return unrepaired.getMaxSSTableBytes();
 +    }
 +
 +    public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
 +    {
 +        return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
 +    }
 +
 +    public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
 +    {
 +        // runWithCompactionsDisabled cancels active compactions and disables them, then we are able
 +        // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
 +        // sstables are marked the compactions are re-enabled
 +        return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>()
 +        {
 +            @Override
 +            public Collection<AbstractCompactionTask> call() throws Exception
 +            {
 +                synchronized (CompactionStrategyManager.this)
 +                {
 +                    Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput);
 +                    Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput);
 +
 +                    if (repairedTasks == null && unrepairedTasks == null)
 +                        return null;
 +
 +                    if (repairedTasks == null)
 +                        return unrepairedTasks;
 +                    if (unrepairedTasks == null)
 +                        return repairedTasks;
 +
 +                    List<AbstractCompactionTask> tasks = new ArrayList<>();
 +                    tasks.addAll(repairedTasks);
 +                    tasks.addAll(unrepairedTasks);
 +                    return tasks;
 +                }
 +            }
 +        }, false, false);
 +    }
 +
 +    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
 +    {
 +        return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
 +    }
 +
 +    public int getEstimatedRemainingTasks()
 +    {
 +        int tasks = 0;
 +        tasks += repaired.getEstimatedRemainingTasks();
 +        tasks += unrepaired.getEstimatedRemainingTasks();
 +
 +        return tasks;
 +    }
 +
 +    public boolean shouldBeEnabled()
 +    {
 +        return params.isEnabled();
 +    }
 +
 +    public String getName()
 +    {
 +        return unrepaired.getName();
 +    }
 +
 +    public List<AbstractCompactionStrategy> getStrategies()
 +    {
 +        return Arrays.asList(repaired, unrepaired);
 +    }
++
++    public synchronized void setNewLocalCompactionStrategy(CompactionParams params)
++    {
++        logger.info("Switching local compaction strategy from {} to {}}", this.params, params);
++        setStrategy(params);
++        if (shouldBeEnabled())
++            enable();
++        else
++            disable();
++        startup();
++    }
++
++    private void setStrategy(CompactionParams params)
++    {
++        if (repaired != null)
++            repaired.shutdown();
++        if (unrepaired != null)
++            unrepaired.shutdown();
++        repaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
++        unrepaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
++        this.params = params;
++    }
++
++    public CompactionParams getCompactionParams()
++    {
++        return params;
++    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 0db231e,2798689..63b21df
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@@ -141,7 -145,88 +145,82 @@@ public class CompactionsCQLTest extend
          assertTrue(minorWasTriggered(KEYSPACE, currentTable()));
      }
  
-     public boolean minorWasTriggered(String keyspace, String cf) throws Throwable
+     @Test
+     public void testSetLocalCompactionStrategy() throws Throwable
+     {
+         createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+         Map<String, String> localOptions = new HashMap<>();
+         localOptions.put("class", "DateTieredCompactionStrategy");
+         getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
 -        WrappingCompactionStrategy wrappingCompactionStrategy = (WrappingCompactionStrategy) getCurrentColumnFamilyStore().getCompactionStrategy();
 -        assertTrue(verifyStrategies(wrappingCompactionStrategy, DateTieredCompactionStrategy.class));
++        assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
+         // altering something non-compaction related
+         execute("ALTER TABLE %s WITH gc_grace_seconds = 1000");
+         // should keep the local compaction strat
 -        assertTrue(verifyStrategies(wrappingCompactionStrategy, DateTieredCompactionStrategy.class));
++        assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
+         // altering a compaction option
+         execute("ALTER TABLE %s WITH compaction = {'class':'SizeTieredCompactionStrategy', 'min_threshold':3}");
+         // will use the new option
 -        assertTrue(verifyStrategies(wrappingCompactionStrategy, SizeTieredCompactionStrategy.class));
++        assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), SizeTieredCompactionStrategy.class));
+     }
+ 
+ 
+     @Test
+     public void testSetLocalCompactionStrategyDisable() throws Throwable
+     {
+         createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+         Map<String, String> localOptions = new HashMap<>();
+         localOptions.put("class", "DateTieredCompactionStrategy");
+         localOptions.put("enabled", "false");
+         getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
 -        assertFalse(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
++        assertFalse(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+         localOptions.clear();
+         localOptions.put("class", "DateTieredCompactionStrategy");
+         // localOptions.put("enabled", "true"); - this is default!
+         getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
 -        assertTrue(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
++        assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+     }
+ 
+ 
+     @Test
+     public void testSetLocalCompactionStrategyEnable() throws Throwable
+     {
+         createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+         Map<String, String> localOptions = new HashMap<>();
+         localOptions.put("class", "DateTieredCompactionStrategy");
+ 
+         getCurrentColumnFamilyStore().disableAutoCompaction();
 -        assertFalse(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
++        assertFalse(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+ 
+         getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
 -        assertTrue(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
++        assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+ 
+     }
+ 
+ 
+ 
+     @Test(expected = IllegalArgumentException.class)
+     public void testBadLocalCompactionStrategyOptions()
+     {
+         createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+         Map<String, String> localOptions = new HashMap<>();
+         localOptions.put("class","SizeTieredCompactionStrategy");
+         localOptions.put("sstable_size_in_mb","1234"); // not for STCS
+         getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
+     }
+ 
 -    public boolean verifyStrategies(WrappingCompactionStrategy wrappingStrategy, Class<? extends AbstractCompactionStrategy> expected)
++    public boolean verifyStrategies(CompactionStrategyManager manager, Class<? extends AbstractCompactionStrategy> expected)
+     {
+         boolean found = false;
 -        for (AbstractCompactionStrategy actualStrategy : wrappingStrategy.getWrappedStrategies())
++        for (AbstractCompactionStrategy actualStrategy : manager.getStrategies())
+         {
+             if (!actualStrategy.getClass().equals(expected))
+                 return false;
+             found = true;
+         }
+         return found;
+     }
+ 
 -    private ColumnFamilyStore getCurrentColumnFamilyStore()
 -    {
 -        return Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable());
 -    }
 -
+     private boolean minorWasTriggered(String keyspace, String cf) throws Throwable
      {
          UntypedResultSet res = execute("SELECT * FROM system.compaction_history");
          boolean minorWasTriggered = false;