You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/08/10 09:20:44 UTC
[1/3] cassandra git commit: Add new JMX methods to change local
compaction strategy
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 e389dc421 -> 929438b8b
Add new JMX methods to change local compaction strategy
Patch by marcuse; reviewed by iamaleksey for CASSANDRA-9965
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5aca7d79
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5aca7d79
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5aca7d79
Branch: refs/heads/cassandra-3.0
Commit: 5aca7d79aaf88f9c34dcae52f24bb62a28add91e
Parents: c8d163f
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Aug 4 20:31:25 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Aug 10 09:02:47 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 3 +-
.../org/apache/cassandra/config/CFMetaData.java | 10 ++-
.../apache/cassandra/db/ColumnFamilyStore.java | 35 +++++++++
.../cassandra/db/ColumnFamilyStoreMBean.java | 21 +++++
.../compaction/AbstractCompactionStrategy.java | 2 +-
.../compaction/WrappingCompactionStrategy.java | 51 +++++++++---
.../db/compaction/CompactionsCQLTest.java | 82 +++++++++++++++++++-
8 files changed, 190 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5aca7d79/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7151883..462de44 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.9
+ * Add new JMX methods to change local compaction strategy (CASSANDRA-9965)
* Write hints for paxos commits (CASSANDRA-7342)
* (cqlsh) Fix timestamps before 1970 on Windows, always
use UTC for timestamp display (CASSANDRA-10000)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5aca7d79/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 0b64e31..f6e2665 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -24,7 +24,8 @@ Upgrading
- Commit log files are no longer recycled by default, due to negative
performance implications. This can be enabled again with the
commitlog_segment_recycling option in your cassandra.yaml
-
+ - JMX methods set/getCompactionStrategyClass have been deprecated, use
+ set/getLocalCompactionStrategy/set/getLocalCompactionStrategyJson instead
2.1.8
=====
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5aca7d79/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 4bc5f1b..2c6a30c 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -1283,7 +1283,9 @@ public final class CFMetaData
return strategyClass;
}
- public AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore cfs)
+ public static AbstractCompactionStrategy createCompactionStrategyInstance(Class<? extends AbstractCompactionStrategy> compactionStrategyClass,
+ ColumnFamilyStore cfs,
+ Map<String, String> compactionStrategyOptions)
{
try
{
@@ -1297,6 +1299,12 @@ public final class CFMetaData
}
}
+ @Deprecated
+ public AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore cfs)
+ {
+ return createCompactionStrategyInstance(compactionStrategyClass, cfs, compactionStrategyOptions);
+ }
+
// converts CFM to thrift CfDef
public org.apache.cassandra.thrift.CfDef toThrift()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5aca7d79/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 6777e7a..f8d796e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -252,6 +252,41 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
};
}
+ public void setLocalCompactionStrategyJson(String options)
+ {
+ setLocalCompactionStrategy(FBUtilities.fromJsonMap(options));
+ }
+
+ public String getLocalCompactionStrategyJson()
+ {
+ return FBUtilities.json(getLocalCompactionStrategy());
+ }
+
+ public void setLocalCompactionStrategy(Map<String, String> options)
+ {
+ try
+ {
+ Map<String, String> optionsCopy = new HashMap<>(options);
+ Class<? extends AbstractCompactionStrategy> compactionStrategyClass = CFMetaData.createCompactionStrategy(optionsCopy.get("class"));
+ optionsCopy.remove("class");
+ CFMetaData.validateCompactionOptions(compactionStrategyClass, optionsCopy);
+ compactionStrategyWrapper.setNewLocalCompactionStrategy(compactionStrategyClass, optionsCopy);
+ }
+ catch (Throwable t)
+ {
+ logger.error("Could not set new local compaction strategy", t);
+ // dont propagate the ConfigurationException over jmx, user will only see a ClassNotFoundException
+ throw new IllegalArgumentException("Could not set new local compaction strategy: "+t.getMessage());
+ }
+ }
+
+ public Map<String, String> getLocalCompactionStrategy()
+ {
+ Map<String, String> options = new HashMap<>(compactionStrategyWrapper.options);
+ options.put("class", compactionStrategyWrapper.getName());
+ return options;
+ }
+
public void setCompactionStrategyClass(String compactionStrategyClass)
{
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5aca7d79/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index 4df593b..e292be4 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -311,14 +311,35 @@ public interface ColumnFamilyStoreMBean
public void setMaximumCompactionThreshold(int threshold);
/**
+ * Sets the compaction strategy locally for this node
+ *
+ * Note that this will be set until an ALTER with compaction = {..} is executed or the node is restarted
+ *
+ * @param options compaction options with the same syntax as when doing ALTER ... WITH compaction = {..}
+ */
+ public void setLocalCompactionStrategyJson(String options);
+ public String getLocalCompactionStrategyJson();
+
+ /**
+ * Sets the compaction strategy locally for this node
+ *
+ * Note that this will be set until an ALTER with compaction = {..} is executed or the node is restarted
+ *
+ * @param options compaction options map
+ */
+ public void setLocalCompactionStrategy(Map<String, String> options);
+ public Map<String, String> getLocalCompactionStrategy();
+ /**
* Sets the compaction strategy by class name
* @param className the name of the compaction strategy class
*/
+ @Deprecated
public void setCompactionStrategyClass(String className);
/**
* Gets the compaction strategy class name
*/
+ @Deprecated
public String getCompactionStrategyClass();
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5aca7d79/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 73cda77..77ca404 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -60,7 +60,7 @@ public abstract class AbstractCompactionStrategy
protected static final String UNCHECKED_TOMBSTONE_COMPACTION_OPTION = "unchecked_tombstone_compaction";
protected static final String COMPACTION_ENABLED = "enabled";
- protected Map<String, String> options;
+ public Map<String, String> options;
protected final ColumnFamilyStore cfs;
protected float tombstoneThreshold;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5aca7d79/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
index 0fed733..ae67599 100644
--- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -47,6 +48,16 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy
private static final Logger logger = LoggerFactory.getLogger(WrappingCompactionStrategy.class);
private volatile AbstractCompactionStrategy repaired;
private volatile AbstractCompactionStrategy unrepaired;
+ /*
+ We keep a copy of the schema compaction options and class here to be able to decide if we
+ should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER.
+
+ If a user changes the local compaction strategy and then later ALTERs a compaction option,
+ we will use the new compaction options.
+ */
+ private Map<String, String> schemaCompactionOptions;
+ private Class<?> schemaCompactionStrategyClass;
+
public WrappingCompactionStrategy(ColumnFamilyStore cfs)
{
super(cfs, cfs.metadata.compactionStrategyOptions);
@@ -146,10 +157,9 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy
public synchronized void maybeReloadCompactionStrategy(CFMetaData metadata)
{
- if (repaired != null && repaired.getClass().equals(metadata.compactionStrategyClass)
- && unrepaired != null && unrepaired.getClass().equals(metadata.compactionStrategyClass)
- && repaired.options.equals(metadata.compactionStrategyOptions)
- && unrepaired.options.equals(metadata.compactionStrategyOptions))
+ // compare the old schema configuration to the new one, ignore any locally set changes.
+ if (metadata.compactionStrategyClass.equals(schemaCompactionStrategyClass) &&
+ metadata.compactionStrategyOptions.equals(schemaCompactionOptions))
return;
reloadCompactionStrategy(metadata);
}
@@ -157,13 +167,10 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy
public synchronized void reloadCompactionStrategy(CFMetaData metadata)
{
boolean disabledWithJMX = !enabled && shouldBeEnabled();
- if (repaired != null)
- repaired.shutdown();
- if (unrepaired != null)
- unrepaired.shutdown();
- repaired = metadata.createCompactionStrategyInstance(cfs);
- unrepaired = metadata.createCompactionStrategyInstance(cfs);
- options = ImmutableMap.copyOf(metadata.compactionStrategyOptions);
+ setStrategy(metadata.compactionStrategyClass, metadata.compactionStrategyOptions);
+ schemaCompactionOptions = ImmutableMap.copyOf(metadata.compactionStrategyOptions);
+ schemaCompactionStrategyClass = repaired.getClass();
+
if (disabledWithJMX || !shouldBeEnabled())
disable();
else
@@ -393,4 +400,26 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy
{
return Arrays.asList(repaired, unrepaired);
}
+
+ public synchronized void setNewLocalCompactionStrategy(Class<? extends AbstractCompactionStrategy> compactionStrategyClass, Map<String, String> options)
+ {
+ logger.info("Switching local compaction strategy from {} to {} with options={}", repaired == null ? "null" : repaired.getClass(), compactionStrategyClass, options);
+ setStrategy(compactionStrategyClass, options);
+ if (shouldBeEnabled())
+ enable();
+ else
+ disable();
+ startup();
+ }
+
+ private void setStrategy(Class<? extends AbstractCompactionStrategy> compactionStrategyClass, Map<String, String> options)
+ {
+ if (repaired != null)
+ repaired.shutdown();
+ if (unrepaired != null)
+ unrepaired.shutdown();
+ repaired = CFMetaData.createCompactionStrategyInstance(compactionStrategyClass, cfs, options);
+ unrepaired = CFMetaData.createCompactionStrategyInstance(compactionStrategyClass, cfs, options);
+ this.options = ImmutableMap.copyOf(options);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5aca7d79/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 58fc062..2798689 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -17,12 +17,16 @@
*/
package org.apache.cassandra.db.compaction;
+import java.util.HashMap;
+import java.util.Map;
+
import org.junit.Test;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -141,12 +145,88 @@ public class CompactionsCQLTest extends CQLTester
assertTrue(minorWasTriggered(KEYSPACE, currentTable()));
}
+ @Test
+ public void testSetLocalCompactionStrategy() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+ Map<String, String> localOptions = new HashMap<>();
+ localOptions.put("class", "DateTieredCompactionStrategy");
+ getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
+ WrappingCompactionStrategy wrappingCompactionStrategy = (WrappingCompactionStrategy) getCurrentColumnFamilyStore().getCompactionStrategy();
+ assertTrue(verifyStrategies(wrappingCompactionStrategy, DateTieredCompactionStrategy.class));
+ // altering something non-compaction related
+ execute("ALTER TABLE %s WITH gc_grace_seconds = 1000");
+ // should keep the local compaction strat
+ assertTrue(verifyStrategies(wrappingCompactionStrategy, DateTieredCompactionStrategy.class));
+ // altering a compaction option
+ execute("ALTER TABLE %s WITH compaction = {'class':'SizeTieredCompactionStrategy', 'min_threshold':3}");
+ // will use the new option
+ assertTrue(verifyStrategies(wrappingCompactionStrategy, SizeTieredCompactionStrategy.class));
+ }
+
+
+ @Test
+ public void testSetLocalCompactionStrategyDisable() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+ Map<String, String> localOptions = new HashMap<>();
+ localOptions.put("class", "DateTieredCompactionStrategy");
+ localOptions.put("enabled", "false");
+ getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
+ assertFalse(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
+ localOptions.clear();
+ localOptions.put("class", "DateTieredCompactionStrategy");
+ // localOptions.put("enabled", "true"); - this is default!
+ getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
+ assertTrue(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
+ }
+
+
+ @Test
+ public void testSetLocalCompactionStrategyEnable() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+ Map<String, String> localOptions = new HashMap<>();
+ localOptions.put("class", "DateTieredCompactionStrategy");
+
+ getCurrentColumnFamilyStore().disableAutoCompaction();
+ assertFalse(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
+
+ getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
+ assertTrue(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
+
+ }
+
+
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBadLocalCompactionStrategyOptions()
+ {
+ createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+ Map<String, String> localOptions = new HashMap<>();
+ localOptions.put("class","SizeTieredCompactionStrategy");
+ localOptions.put("sstable_size_in_mb","1234"); // not for STCS
+ getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
+ }
+
+ public boolean verifyStrategies(WrappingCompactionStrategy wrappingStrategy, Class<? extends AbstractCompactionStrategy> expected)
+ {
+ boolean found = false;
+ for (AbstractCompactionStrategy actualStrategy : wrappingStrategy.getWrappedStrategies())
+ {
+ if (!actualStrategy.getClass().equals(expected))
+ return false;
+ found = true;
+ }
+ return found;
+ }
+
private ColumnFamilyStore getCurrentColumnFamilyStore()
{
return Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable());
}
- public boolean minorWasTriggered(String keyspace, String cf) throws Throwable
+ private boolean minorWasTriggered(String keyspace, String cf) throws Throwable
{
UntypedResultSet res = execute("SELECT * FROM system.compaction_history");
boolean minorWasTriggered = false;
[2/3] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/config/CFMetaData.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9c3b967e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9c3b967e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9c3b967e
Branch: refs/heads/cassandra-3.0
Commit: 9c3b967e7186c1c3b6f1c25c627e770187020344
Parents: 6d0cf7d 5aca7d7
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Aug 10 09:08:39 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Aug 10 09:08:39 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 3 +-
.../org/apache/cassandra/config/CFMetaData.java | 10 ++-
.../apache/cassandra/db/ColumnFamilyStore.java | 35 +++++++++
.../cassandra/db/ColumnFamilyStoreMBean.java | 21 +++++
.../compaction/AbstractCompactionStrategy.java | 2 +-
.../compaction/WrappingCompactionStrategy.java | 51 +++++++++---
.../db/compaction/CompactionsCQLTest.java | 82 +++++++++++++++++++-
8 files changed, 190 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3b967e/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a913fe7,462de44..772455c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,10 -1,5 +1,11 @@@
-2.1.9
+2.2.1
+ * Add checksum to saved cache files (CASSANDRA-9265)
+ * Log warning when using an aggregate without partition key (CASSANDRA-9737)
+ * Avoid grouping sstables for anticompaction with DTCS (CASSANDRA-9900)
+ * UDF / UDA execution time in trace (CASSANDRA-9723)
+ * Fix broken internode SSL (CASSANDRA-9884)
+Merged from 2.1:
+ * Add new JMX methods to change local compaction strategy (CASSANDRA-9965)
* Write hints for paxos commits (CASSANDRA-7342)
* (cqlsh) Fix timestamps before 1970 on Windows, always
use UTC for timestamp display (CASSANDRA-10000)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3b967e/NEWS.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3b967e/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/CFMetaData.java
index d8eeaf2,2c6a30c..6468973
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@@ -876,6 -1299,57 +878,12 @@@ public final class CFMetaDat
}
}
+ @Deprecated
+ public AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore cfs)
+ {
+ return createCompactionStrategyInstance(compactionStrategyClass, cfs, compactionStrategyOptions);
+ }
+
- // converts CFM to thrift CfDef
- public org.apache.cassandra.thrift.CfDef toThrift()
- {
- org.apache.cassandra.thrift.CfDef def = new org.apache.cassandra.thrift.CfDef(ksName, cfName);
- def.setColumn_type(cfType.name());
-
- if (isSuper())
- {
- def.setComparator_type(comparator.subtype(0).toString());
- def.setSubcomparator_type(comparator.subtype(1).toString());
- }
- else
- {
- def.setComparator_type(comparator.toString());
- }
-
- def.setComment(Strings.nullToEmpty(comment));
- def.setRead_repair_chance(readRepairChance);
- def.setDclocal_read_repair_chance(dcLocalReadRepairChance);
- def.setGc_grace_seconds(gcGraceSeconds);
- def.setDefault_validation_class(defaultValidator == null ? null : defaultValidator.toString());
- def.setKey_validation_class(keyValidator.toString());
- def.setMin_compaction_threshold(minCompactionThreshold);
- def.setMax_compaction_threshold(maxCompactionThreshold);
- // We only return the alias if only one is set since thrift don't know about multiple key aliases
- if (partitionKeyColumns.size() == 1)
- def.setKey_alias(partitionKeyColumns.get(0).name.bytes);
- def.setColumn_metadata(ColumnDefinition.toThrift(columnMetadata));
- def.setCompaction_strategy(compactionStrategyClass.getName());
- def.setCompaction_strategy_options(new HashMap<>(compactionStrategyOptions));
- def.setCompression_options(compressionParameters.asThriftOptions());
- if (bloomFilterFpChance != null)
- def.setBloom_filter_fp_chance(bloomFilterFpChance);
- def.setMin_index_interval(minIndexInterval);
- def.setMax_index_interval(maxIndexInterval);
- def.setMemtable_flush_period_in_ms(memtableFlushPeriod);
- def.setCaching(caching.toThriftCaching());
- def.setCells_per_row_to_cache(caching.toThriftCellsPerRow());
- def.setDefault_time_to_live(defaultTimeToLive);
- def.setSpeculative_retry(speculativeRetry.toString());
- def.setTriggers(TriggerDefinition.toThrift(triggers));
-
- return def;
- }
-
/**
* Returns the ColumnDefinition for {@code name}.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3b967e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3b967e/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3b967e/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3b967e/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
----------------------------------------------------------------------
[3/3] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Conflicts:
src/java/org/apache/cassandra/config/CFMetaData.java
src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/929438b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/929438b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/929438b8
Branch: refs/heads/cassandra-3.0
Commit: 929438b8be32e38f6d921bfdc4a011cd526dfeb7
Parents: e389dc4 9c3b967
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Aug 10 09:12:52 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Aug 10 09:13:01 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 3 +-
.../org/apache/cassandra/config/CFMetaData.java | 7 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 29 ++++++-
.../cassandra/db/ColumnFamilyStoreMBean.java | 19 +++--
.../compaction/CompactionStrategyManager.java | 51 +++++++++---
.../db/compaction/CompactionsCQLTest.java | 81 +++++++++++++++++++-
7 files changed, 166 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7cb9f16,772455c..639dd59
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -19,39 -13,6 +19,40 @@@ Merged from 2.1
when both exist (CASSANDRA-9777)
* Release snapshot selfRef when doing snapshot repair (CASSANDRA-9998)
* Cannot replace token does not exist - DN node removed as Fat Client (CASSANDRA-9871)
+Merged from 2.0:
+ * Don't cast expected bf size to an int (CASSANDRA-9959)
+
+
+3.0.0-alpha1
+ * Implement proper sandboxing for UDFs (CASSANDRA-9402)
+ * Simplify (and unify) cleanup of compaction leftovers (CASSANDRA-7066)
+ * Allow extra schema definitions in cassandra-stress yaml (CASSANDRA-9850)
+ * Metrics should use up to date nomenclature (CASSANDRA-9448)
+ * Change CREATE/ALTER TABLE syntax for compression (CASSANDRA-8384)
+ * Cleanup crc and adler code for java 8 (CASSANDRA-9650)
+ * Storage engine refactor (CASSANDRA-8099, 9743, 9746, 9759, 9781, 9808, 9825,
+ 9848, 9705, 9859, 9867, 9874, 9828, 9801)
+ * Update Guava to 18.0 (CASSANDRA-9653)
+ * Bloom filter false positive ratio is not honoured (CASSANDRA-8413)
+ * New option for cassandra-stress to leave a ratio of columns null (CASSANDRA-9522)
+ * Change hinted_handoff_enabled yaml setting, JMX (CASSANDRA-9035)
+ * Add algorithmic token allocation (CASSANDRA-7032)
+ * Add nodetool command to replay batchlog (CASSANDRA-9547)
+ * Make file buffer cache independent of paths being read (CASSANDRA-8897)
+ * Remove deprecated legacy Hadoop code (CASSANDRA-9353)
+ * Decommissioned nodes will not rejoin the cluster (CASSANDRA-8801)
+ * Change gossip stabilization to use endpoit size (CASSANDRA-9401)
+ * Change default garbage collector to G1 (CASSANDRA-7486)
+ * Populate TokenMetadata early during startup (CASSANDRA-9317)
+ * Undeprecate cache recentHitRate (CASSANDRA-6591)
+ * Add support for selectively varint encoding fields (CASSANDRA-9499, 9865)
+ * Materialized Views (CASSANDRA-6477)
+Merged from 2.2:
+ * Avoid grouping sstables for anticompaction with DTCS (CASSANDRA-9900)
+ * UDF / UDA execution time in trace (CASSANDRA-9723)
+ * Fix broken internode SSL (CASSANDRA-9884)
+Merged from 2.1:
++ * Add new JMX methods to change local compaction strategy (CASSANDRA-9965)
* Fix handling of enable/disable autocompaction (CASSANDRA-9899)
* Add consistency level to tracing ouput (CASSANDRA-9827)
* Remove repair snapshot leftover on startup (CASSANDRA-7357)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index 0fa4ded,ccc5cc8..5a690bd
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -13,55 -13,6 +13,56 @@@ restore snapshots created with the prev
'sstableloader' tool. You can upgrade the file format of your snapshots
using the provided 'sstableupgrade' tool.
+3.0
+===
+
+New features
+------------
+ - Materialized Views, which allow for server-side denormalization, is now
+ available. Materialized views provide an alternative to secondary indexes
+ for non-primary key queries, and perform much better for indexing high
+ cardinality columns.
+ See http://www.datastax.com/dev/blog/new-in-cassandra-3-0-materialized-views
+
+
+Upgrading
+---------
+ - 3.0 requires Java 8u20 or later.
+ - The default JVM GC has been changed to G1GC.
+ - The default JVM flag -XX:+PerfDisableSharedMem will cause the following tools JVM
+ to stop working: jps, jstack, jinfo, jmc, jcmd as well as 3rd party tools like Jolokia.
+ If you wish to use these tools you can comment this flag out in cassandra-env.{sh,ps1}
+ - New write stages have been added for batchlog and materialized view mutations
+ you can set their size in cassandra.yaml
+ - User defined functions are now executed in a sandbox.
+ To use UDFs and UDAs, you have to enable them in cassandra.yaml.
+ - New SSTable version 'la' with improved bloom-filter false-positive handling
+ compared to previous version 'ka' used in 2.2 and 2.1. Running sstableupgrade
+ is not necessary but recommended.
+ - Before upgrading to 3.0, make sure that your cluster is in complete agreement
+ (schema versions outputted by `nodetool describecluster` are all the same).
+ - Schema metadata is now stored in the new `system_schema` keyspace, and
+ legacy `system.schema_*` tables are now gone; see CASSANDRA-6717 for details.
+ - Pig's CassandraStorage has been removed. Use CqlNativeStorage instead.
+ - Hadoop BulkOutputFormat and BulkRecordWriter have been removed; use
+ CqlBulkOutputFormat and CqlBulkRecordWriter instead.
+ - Hadoop ColumnFamilyInputFormat and ColumnFamilyOutputFormat have been removed;
+ use CqlInputFormat and CqlOutputFormat instead.
+ - Hadoop ColumnFamilyRecordReader and ColumnFamilyRecordWriter have been removed;
+ use CqlRecordReader and CqlRecordWriter instead.
+ - hinted_handoff_enabled in cassandra.yaml no longer supports a list of data centers.
+ To specify a list of excluded data centers when hinted_handoff_enabled is set to true,
+ use hinted_handoff_disabled_datacenters, see CASSANDRA-9035 for details.
+ - The `sstable_compression` and `chunk_length_kb` compression options have been deprecated.
+ The new options are `class` and `chunk_length_in_kb`. Disabling compression should now
+ be done by setting the new option `enabled` to `false`.
+ - Only map syntax is now allowed for caching options. ALL/NONE/KEYS_ONLY/ROWS_ONLY syntax
+ has been deprecated since 2.1.0 and is being removed in 3.0.0.
+ - Batchlog entries are now stored in a new table - system.batches.
+ The old one has been deprecated.
-
++ - JMX methods set/getCompactionStrategyClass have been removed, use
++ set/getLocalCompactionStrategy or set/getLocalCompactionStrategyJson instead.
+
2.2
===
http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/CFMetaData.java
index 1d38274,6468973..7719587
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@@ -772,25 -819,60 +772,26 @@@ public final class CFMetaDat
throw new ConfigurationException(String.format("Column family comparators do not match or are not compatible (found %s; expected %s).", cfm.comparator.getClass().getSimpleName(), comparator.getClass().getSimpleName()));
}
- public static void validateCompactionOptions(Class<? extends AbstractCompactionStrategy> strategyClass, Map<String, String> options) throws ConfigurationException
+ public static Class<? extends AbstractCompactionStrategy> createCompactionStrategy(String className) throws ConfigurationException
+ {
+ className = className.contains(".") ? className : "org.apache.cassandra.db.compaction." + className;
+ Class<AbstractCompactionStrategy> strategyClass = FBUtilities.classForName(className, "compaction strategy");
+ if (!AbstractCompactionStrategy.class.isAssignableFrom(strategyClass))
+ throw new ConfigurationException(String.format("Specified compaction strategy class (%s) is not derived from AbstractReplicationStrategy", className));
+
+ return strategyClass;
+ }
+
- public AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore cfs)
++ public static AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore cfs,
++ CompactionParams compactionParams)
{
try
{
- if (options == null)
- return;
-
- Map<?,?> unknownOptions = (Map) strategyClass.getMethod("validateOptions", Map.class).invoke(null, options);
- if (!unknownOptions.isEmpty())
- throw new ConfigurationException(String.format("Properties specified %s are not understood by %s", unknownOptions.keySet(), strategyClass.getSimpleName()));
+ Constructor<? extends AbstractCompactionStrategy> constructor =
- params.compaction.klass().getConstructor(ColumnFamilyStore.class, Map.class);
- return constructor.newInstance(cfs, params.compaction.options());
++ compactionParams.klass().getConstructor(ColumnFamilyStore.class, Map.class);
++ return constructor.newInstance(cfs, compactionParams.options());
}
- catch (NoSuchMethodException e)
- {
- logger.warn("Compaction Strategy {} does not have a static validateOptions method. Validation ignored", strategyClass.getName());
- }
- catch (InvocationTargetException e)
- {
- if (e.getTargetException() instanceof ConfigurationException)
- throw (ConfigurationException) e.getTargetException();
- throw new ConfigurationException("Failed to validate compaction options: " + options);
- }
- catch (ConfigurationException e)
- {
- throw e;
- }
- catch (Exception e)
- {
- throw new ConfigurationException("Failed to validate compaction options: " + options);
- }
- }
-
- public static Class<? extends AbstractCompactionStrategy> createCompactionStrategy(String className) throws ConfigurationException
- {
- className = className.contains(".") ? className : "org.apache.cassandra.db.compaction." + className;
- Class<AbstractCompactionStrategy> strategyClass = FBUtilities.classForName(className, "compaction strategy");
- if (className.equals(WrappingCompactionStrategy.class.getName()))
- throw new ConfigurationException("You can't set WrappingCompactionStrategy as the compaction strategy!");
- if (!AbstractCompactionStrategy.class.isAssignableFrom(strategyClass))
- throw new ConfigurationException(String.format("Specified compaction strategy class (%s) is not derived from AbstractReplicationStrategy", className));
-
- return strategyClass;
- }
-
- public static AbstractCompactionStrategy createCompactionStrategyInstance(Class<? extends AbstractCompactionStrategy> compactionStrategyClass,
- ColumnFamilyStore cfs,
- Map<String, String> compactionStrategyOptions)
- {
- try
- {
- Constructor<? extends AbstractCompactionStrategy> constructor =
- compactionStrategyClass.getConstructor(ColumnFamilyStore.class, Map.class);
- return constructor.newInstance(cfs, compactionStrategyOptions);
- }
- catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e)
+ catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e)
{
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index beb2b93,6b71be9..4ae6694
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -248,14 -259,57 +248,35 @@@ public class ColumnFamilyStore implemen
};
}
- public void setCompactionStrategyClass(String compactionStrategyClass)
+ public void setLocalCompactionStrategyJson(String options)
{
- throw new UnsupportedOperationException("ColumnFamilyStore.setCompactionStrategyClass() method is no longer supported");
+ setLocalCompactionStrategy(FBUtilities.fromJsonMap(options));
}
- public String getCompactionStrategyClass()
+ public String getLocalCompactionStrategyJson()
{
- return metadata.params.compaction.klass().getName();
+ return FBUtilities.json(getLocalCompactionStrategy());
+ }
+
+ public void setLocalCompactionStrategy(Map<String, String> options)
+ {
+ try
+ {
- Map<String, String> optionsCopy = new HashMap<>(options);
- Class<? extends AbstractCompactionStrategy> compactionStrategyClass = CFMetaData.createCompactionStrategy(optionsCopy.get("class"));
- optionsCopy.remove("class");
- CFMetaData.validateCompactionOptions(compactionStrategyClass, optionsCopy);
- compactionStrategyWrapper.setNewLocalCompactionStrategy(compactionStrategyClass, optionsCopy);
++ CompactionParams compactionParams = CompactionParams.fromMap(options);
++ compactionParams.validate();
++ compactionStrategyManager.setNewLocalCompactionStrategy(compactionParams);
+ }
+ catch (Throwable t)
+ {
+ logger.error("Could not set new local compaction strategy", t);
+ // dont propagate the ConfigurationException over jmx, user will only see a ClassNotFoundException
+ throw new IllegalArgumentException("Could not set new local compaction strategy: "+t.getMessage());
+ }
+ }
+
+ public Map<String, String> getLocalCompactionStrategy()
+ {
- Map<String, String> options = new HashMap<>(compactionStrategyWrapper.options);
- options.put("class", compactionStrategyWrapper.getName());
- return options;
- }
-
- public void setCompactionStrategyClass(String compactionStrategyClass)
- {
- try
- {
- metadata.compactionStrategyClass = CFMetaData.createCompactionStrategy(compactionStrategyClass);
- compactionStrategyWrapper.maybeReloadCompactionStrategy(metadata);
- }
- catch (ConfigurationException e)
- {
- throw new IllegalArgumentException(e.getMessage());
- }
- }
-
- public String getCompactionStrategyClass()
- {
- return metadata.compactionStrategyClass.getName();
++ return compactionStrategyManager.getCompactionParams().asMap();
}
public Map<String,String> getCompressionParameters()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index c23df74,1a8ba1d..84c6dd1
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@@ -70,15 -67,36 +70,24 @@@ public interface ColumnFamilyStoreMBea
public void setMaximumCompactionThreshold(int threshold);
/**
- * Sets the compaction strategy by class name
- * @param className the name of the compaction strategy class
+ * Sets the compaction strategy locally for this node
+ *
+ * Note that this will be set until an ALTER with compaction = {..} is executed or the node is restarted
+ *
+ * @param options compaction options with the same syntax as when doing ALTER ... WITH compaction = {..}
*/
- public void setCompactionStrategyClass(String className);
+ public void setLocalCompactionStrategyJson(String options);
+ public String getLocalCompactionStrategyJson();
/**
- * Gets the compaction strategy class name
+ * Sets the compaction strategy locally for this node
+ *
+ * Note that this will be set until an ALTER with compaction = {..} is executed or the node is restarted
+ *
+ * @param options compaction options map
*/
- public String getCompactionStrategyClass();
+ public void setLocalCompactionStrategy(Map<String, String> options);
+ public Map<String, String> getLocalCompactionStrategy();
- /**
- * Sets the compaction strategy by class name
- * @param className the name of the compaction strategy class
- */
- @Deprecated
- public void setCompactionStrategyClass(String className);
-
- /**
- * Gets the compaction strategy class name
- */
- @Deprecated
- public String getCompactionStrategyClass();
/**
* Get the compression parameters
http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 4f6dfa2,0000000..7204da0
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -1,453 -1,0 +1,482 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+
+import java.util.*;
+import java.util.concurrent.Callable;
+
+import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.notifications.*;
+import org.apache.cassandra.schema.CompactionParams;
+
+/**
+ * Manages the compaction strategies.
+ *
+ * Currently has two instances of actual compaction strategies - one for repaired data and one for
+ * unrepaired data. This is done to be able to totally separate the different sets of sstables.
+ */
+public class CompactionStrategyManager implements INotificationConsumer
+{
+ private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
+ private final ColumnFamilyStore cfs;
+ private volatile AbstractCompactionStrategy repaired;
+ private volatile AbstractCompactionStrategy unrepaired;
+ private volatile boolean enabled = true;
+ public boolean isActive = true;
+ private volatile CompactionParams params;
++ /*
++ We keep a copy of the schema compaction parameters here to be able to decide if we
++ should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER.
++
++ If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
++ we will use the new compaction parameters.
++ */
++ private CompactionParams schemaCompactionParams;
+
+ public CompactionStrategyManager(ColumnFamilyStore cfs)
+ {
+ cfs.getTracker().subscribe(this);
+ logger.debug("{} subscribed to the data tracker.", this);
+ this.cfs = cfs;
+ reload(cfs.metadata);
+ params = cfs.metadata.params.compaction;
+ enabled = params.isEnabled();
+ }
+
+ /**
+ * Return the next background task
+ *
+ * Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks)
+ *
+ */
+ public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+ {
+ if (!isEnabled())
+ return null;
+
+ maybeReload(cfs.metadata);
+
+ if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
+ {
+ AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
+ if (repairedTask != null)
+ return repairedTask;
+ return unrepaired.getNextBackgroundTask(gcBefore);
+ }
+ else
+ {
+ AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
+ if (unrepairedTask != null)
+ return unrepairedTask;
+ return repaired.getNextBackgroundTask(gcBefore);
+ }
+ }
+
+ public boolean isEnabled()
+ {
+ return enabled && isActive;
+ }
+
+ public synchronized void resume()
+ {
+ isActive = true;
+ }
+
+ /**
+ * pause compaction while we cancel all ongoing compactions
+ *
+ * Separate call from enable/disable to not have to save the enabled-state externally
+ */
+ public synchronized void pause()
+ {
+ isActive = false;
+ }
+
+
+ private void startup()
+ {
+ for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
+ {
+ if (sstable.openReason != SSTableReader.OpenReason.EARLY)
+ getCompactionStrategyFor(sstable).addSSTable(sstable);
+ }
+ repaired.startup();
+ unrepaired.startup();
+ }
+
+ /**
+ * return the compaction strategy for the given sstable
+ *
+ * returns differently based on the repaired status
+ * @param sstable
+ * @return
+ */
+ private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
+ {
+ if (sstable.isRepaired())
+ return repaired;
+ else
+ return unrepaired;
+ }
+
+ public void shutdown()
+ {
+ isActive = false;
+ repaired.shutdown();
+ unrepaired.shutdown();
+ }
+
+ public synchronized void maybeReload(CFMetaData metadata)
+ {
- if (repaired != null && repaired.getClass().equals(metadata.params.compaction.klass())
- && unrepaired != null && unrepaired.getClass().equals(metadata.params.compaction.klass())
- && repaired.options.equals(metadata.params.compaction.options()) // todo: assumes all have the same options
- && unrepaired.options.equals(metadata.params.compaction.options()))
++ // compare the old schema configuration to the new one, ignore any locally set changes.
++ if (metadata.params.compaction.equals(schemaCompactionParams))
+ return;
+ reload(metadata);
+ }
+
+ /**
+ * Reload the compaction strategies
+ *
+ * Called after changing configuration and at startup.
+ * @param metadata
+ */
+ public synchronized void reload(CFMetaData metadata)
+ {
+ boolean disabledWithJMX = !enabled && shouldBeEnabled();
- if (repaired != null)
- repaired.shutdown();
- if (unrepaired != null)
- unrepaired.shutdown();
- repaired = metadata.createCompactionStrategyInstance(cfs);
- unrepaired = metadata.createCompactionStrategyInstance(cfs);
- params = metadata.params.compaction;
++ setStrategy(metadata.params.compaction);
++ schemaCompactionParams = metadata.params.compaction;
++
+ if (disabledWithJMX || !shouldBeEnabled())
+ disable();
+ else
+ enable();
+ startup();
+ }
+
+ public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+ {
+ cfs.getTracker().replaceFlushed(memtable, sstable);
+ if (sstable != null)
+ CompactionManager.instance.submitBackground(cfs);
+ }
+
+ public int getUnleveledSSTables()
+ {
+ if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
+ {
+ int count = 0;
+ count += ((LeveledCompactionStrategy)repaired).getLevelSize(0);
+ count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
+ return count;
+ }
+ return 0;
+ }
+
+ public synchronized int[] getSSTableCountPerLevel()
+ {
+ if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
+ {
+ int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
+ int[] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
+ res = sumArrays(res, repairedCountPerLevel);
+ int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
+ res = sumArrays(res, unrepairedCountPerLevel);
+ return res;
+ }
+ return null;
+ }
+
+ private static int[] sumArrays(int[] a, int[] b)
+ {
+ int[] res = new int[Math.max(a.length, b.length)];
+ for (int i = 0; i < res.length; i++)
+ {
+ if (i < a.length && i < b.length)
+ res[i] = a[i] + b[i];
+ else if (i < a.length)
+ res[i] = a[i];
+ else
+ res[i] = b[i];
+ }
+ return res;
+ }
+
+ public boolean shouldDefragment()
+ {
+ assert repaired.getClass().equals(unrepaired.getClass());
+ return repaired.shouldDefragment();
+ }
+
+
+ public synchronized void handleNotification(INotification notification, Object sender)
+ {
+ if (notification instanceof SSTableAddedNotification)
+ {
+ SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
+ if (flushedNotification.added.isRepaired())
+ repaired.addSSTable(flushedNotification.added);
+ else
+ unrepaired.addSSTable(flushedNotification.added);
+ }
+ else if (notification instanceof SSTableListChangedNotification)
+ {
+ SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
+ Set<SSTableReader> repairedRemoved = new HashSet<>();
+ Set<SSTableReader> repairedAdded = new HashSet<>();
+ Set<SSTableReader> unrepairedRemoved = new HashSet<>();
+ Set<SSTableReader> unrepairedAdded = new HashSet<>();
+
+ for (SSTableReader sstable : listChangedNotification.removed)
+ {
+ if (sstable.isRepaired())
+ repairedRemoved.add(sstable);
+ else
+ unrepairedRemoved.add(sstable);
+ }
+ for (SSTableReader sstable : listChangedNotification.added)
+ {
+ if (sstable.isRepaired())
+ repairedAdded.add(sstable);
+ else
+ unrepairedAdded.add(sstable);
+ }
+ if (!repairedRemoved.isEmpty())
+ {
+ repaired.replaceSSTables(repairedRemoved, repairedAdded);
+ }
+ else
+ {
+ for (SSTableReader sstable : repairedAdded)
+ repaired.addSSTable(sstable);
+ }
+
+ if (!unrepairedRemoved.isEmpty())
+ {
+ unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded);
+ }
+ else
+ {
+ for (SSTableReader sstable : unrepairedAdded)
+ unrepaired.addSSTable(sstable);
+ }
+ }
+ else if (notification instanceof SSTableRepairStatusChanged)
+ {
+ for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
+ {
+ if (sstable.isRepaired())
+ {
+ unrepaired.removeSSTable(sstable);
+ repaired.addSSTable(sstable);
+ }
+ else
+ {
+ repaired.removeSSTable(sstable);
+ unrepaired.addSSTable(sstable);
+ }
+ }
+ }
+ else if (notification instanceof SSTableDeletingNotification)
+ {
+ SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
+ if (sstable.isRepaired())
+ repaired.removeSSTable(sstable);
+ else
+ unrepaired.removeSSTable(sstable);
+ }
+ }
+
+ public void enable()
+ {
+ if (repaired != null)
+ repaired.enable();
+ if (unrepaired != null)
+ unrepaired.enable();
+ // enable this last to make sure the strategies are ready to get calls.
+ enabled = true;
+ }
+
+ public void disable()
+ {
+ // disable this first avoid asking disabled strategies for compaction tasks
+ enabled = false;
+ if (repaired != null)
+ repaired.disable();
+ if (unrepaired != null)
+ unrepaired.disable();
+ }
+
+ /**
+ * Create ISSTableScanner from the given sstables
+ *
+ * Delegates the call to the compaction strategies to allow LCS to create a scanner
+ * @param sstables
+ * @param range
+ * @return
+ */
+ @SuppressWarnings("resource")
+ public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
+ {
+ List<SSTableReader> repairedSSTables = new ArrayList<>();
+ List<SSTableReader> unrepairedSSTables = new ArrayList<>();
+ for (SSTableReader sstable : sstables)
+ {
+ if (sstable.isRepaired())
+ repairedSSTables.add(sstable);
+ else
+ unrepairedSSTables.add(sstable);
+ }
+
+ Set<ISSTableScanner> scanners = new HashSet<>(sstables.size());
+
+ for (Range<Token> range : ranges)
+ {
+ AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
+ AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
+
+ for (ISSTableScanner scanner : Iterables.concat(repairedScanners.scanners, unrepairedScanners.scanners))
+ {
+ if (!scanners.add(scanner))
+ scanner.close();
+ }
+ }
+
+ return new AbstractCompactionStrategy.ScannerList(new ArrayList<>(scanners));
+ }
+
+ public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
+ {
+ return getScanners(sstables, Collections.singleton(null));
+ }
+
+ public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
+ {
+ return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
+ }
+
+ public long getMaxSSTableBytes()
+ {
+ return unrepaired.getMaxSSTableBytes();
+ }
+
+ public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
+ {
+ return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+ }
+
+ public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
+ {
+ // runWithCompactionsDisabled cancels active compactions and disables them, then we are able
+ // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
+ // sstables are marked the compactions are re-enabled
+ return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>()
+ {
+ @Override
+ public Collection<AbstractCompactionTask> call() throws Exception
+ {
+ synchronized (CompactionStrategyManager.this)
+ {
+ Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput);
+ Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput);
+
+ if (repairedTasks == null && unrepairedTasks == null)
+ return null;
+
+ if (repairedTasks == null)
+ return unrepairedTasks;
+ if (unrepairedTasks == null)
+ return repairedTasks;
+
+ List<AbstractCompactionTask> tasks = new ArrayList<>();
+ tasks.addAll(repairedTasks);
+ tasks.addAll(unrepairedTasks);
+ return tasks;
+ }
+ }
+ }, false, false);
+ }
+
+ public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+ {
+ return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
+ }
+
+ public int getEstimatedRemainingTasks()
+ {
+ int tasks = 0;
+ tasks += repaired.getEstimatedRemainingTasks();
+ tasks += unrepaired.getEstimatedRemainingTasks();
+
+ return tasks;
+ }
+
+ public boolean shouldBeEnabled()
+ {
+ return params.isEnabled();
+ }
+
+ public String getName()
+ {
+ return unrepaired.getName();
+ }
+
+ public List<AbstractCompactionStrategy> getStrategies()
+ {
+ return Arrays.asList(repaired, unrepaired);
+ }
++
++ public synchronized void setNewLocalCompactionStrategy(CompactionParams params)
++ {
++ logger.info("Switching local compaction strategy from {} to {}}", this.params, params);
++ setStrategy(params);
++ if (shouldBeEnabled())
++ enable();
++ else
++ disable();
++ startup();
++ }
++
++ private void setStrategy(CompactionParams params)
++ {
++ if (repaired != null)
++ repaired.shutdown();
++ if (unrepaired != null)
++ unrepaired.shutdown();
++ repaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
++ unrepaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
++ this.params = params;
++ }
++
++ public CompactionParams getCompactionParams()
++ {
++ return params;
++ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 0db231e,2798689..63b21df
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@@ -141,7 -145,88 +145,82 @@@ public class CompactionsCQLTest extend
assertTrue(minorWasTriggered(KEYSPACE, currentTable()));
}
- public boolean minorWasTriggered(String keyspace, String cf) throws Throwable
+ @Test
+ public void testSetLocalCompactionStrategy() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+ Map<String, String> localOptions = new HashMap<>();
+ localOptions.put("class", "DateTieredCompactionStrategy");
+ getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
- WrappingCompactionStrategy wrappingCompactionStrategy = (WrappingCompactionStrategy) getCurrentColumnFamilyStore().getCompactionStrategy();
- assertTrue(verifyStrategies(wrappingCompactionStrategy, DateTieredCompactionStrategy.class));
++ assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
+ // altering something non-compaction related
+ execute("ALTER TABLE %s WITH gc_grace_seconds = 1000");
+ // should keep the local compaction strat
- assertTrue(verifyStrategies(wrappingCompactionStrategy, DateTieredCompactionStrategy.class));
++ assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
+ // altering a compaction option
+ execute("ALTER TABLE %s WITH compaction = {'class':'SizeTieredCompactionStrategy', 'min_threshold':3}");
+ // will use the new option
- assertTrue(verifyStrategies(wrappingCompactionStrategy, SizeTieredCompactionStrategy.class));
++ assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), SizeTieredCompactionStrategy.class));
+ }
+
+
+ @Test
+ public void testSetLocalCompactionStrategyDisable() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+ Map<String, String> localOptions = new HashMap<>();
+ localOptions.put("class", "DateTieredCompactionStrategy");
+ localOptions.put("enabled", "false");
+ getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
- assertFalse(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
++ assertFalse(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+ localOptions.clear();
+ localOptions.put("class", "DateTieredCompactionStrategy");
+ // localOptions.put("enabled", "true"); - this is default!
+ getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
- assertTrue(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
++ assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+ }
+
+
+ @Test
+ public void testSetLocalCompactionStrategyEnable() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+ Map<String, String> localOptions = new HashMap<>();
+ localOptions.put("class", "DateTieredCompactionStrategy");
+
+ getCurrentColumnFamilyStore().disableAutoCompaction();
- assertFalse(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
++ assertFalse(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+
+ getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
- assertTrue(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled());
++ assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+
+ }
+
+
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBadLocalCompactionStrategyOptions()
+ {
+ createTable("CREATE TABLE %s (id text PRIMARY KEY)");
+ Map<String, String> localOptions = new HashMap<>();
+ localOptions.put("class","SizeTieredCompactionStrategy");
+ localOptions.put("sstable_size_in_mb","1234"); // not for STCS
+ getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions);
+ }
+
- public boolean verifyStrategies(WrappingCompactionStrategy wrappingStrategy, Class<? extends AbstractCompactionStrategy> expected)
++ public boolean verifyStrategies(CompactionStrategyManager manager, Class<? extends AbstractCompactionStrategy> expected)
+ {
+ boolean found = false;
- for (AbstractCompactionStrategy actualStrategy : wrappingStrategy.getWrappedStrategies())
++ for (AbstractCompactionStrategy actualStrategy : manager.getStrategies())
+ {
+ if (!actualStrategy.getClass().equals(expected))
+ return false;
+ found = true;
+ }
+ return found;
+ }
+
- private ColumnFamilyStore getCurrentColumnFamilyStore()
- {
- return Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable());
- }
-
+ private boolean minorWasTriggered(String keyspace, String cf) throws Throwable
{
UntypedResultSet res = execute("SELECT * FROM system.compaction_history");
boolean minorWasTriggered = false;