You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/07/29 17:12:52 UTC
[1/3] cassandra git commit: Fix handling of enable/disable
autocompaction.
Repository: cassandra
Updated Branches:
refs/heads/trunk 3e75d5a62 -> d26187e5c
Fix handling of enable/disable autocompaction.
Patch by marcuse; reviewed by Jeremiah Jordan for CASSANDRA-9899
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dce303bc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dce303bc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dce303bc
Branch: refs/heads/trunk
Commit: dce303bcb5748d302448769177a245a30ec3cc19
Parents: 7395207
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jul 27 09:31:08 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Jul 29 15:58:21 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compaction/AbstractCompactionStrategy.java | 2 +-
.../DateTieredCompactionStrategy.java | 5 +---
.../compaction/LeveledCompactionStrategy.java | 2 --
.../SizeTieredCompactionStrategy.java | 6 ----
.../compaction/WrappingCompactionStrategy.java | 30 +++++++++++++++++++-
6 files changed, 32 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dce303bc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c4bb21c..4ef77ed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.9
+ * Fix handling of enable/disable autocompaction (CASSANDRA-9899)
* Commit log segment recycling is disabled by default (CASSANDRA-9896)
* Add consistency level to tracing ouput (CASSANDRA-9827)
* Fix MarshalException when upgrading superColumn family (CASSANDRA-9582)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dce303bc/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 9eac2d0..73cda77 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -60,7 +60,7 @@ public abstract class AbstractCompactionStrategy
protected static final String UNCHECKED_TOMBSTONE_COMPACTION_OPTION = "unchecked_tombstone_compaction";
protected static final String COMPACTION_ENABLED = "enabled";
- public final Map<String, String> options;
+ protected Map<String, String> options;
protected final ColumnFamilyStore cfs;
protected float tombstoneThreshold;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dce303bc/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 15287bd..dec0cef 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -57,9 +57,6 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
@Override
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
- if (!isEnabled())
- return null;
-
while (true)
{
List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);
@@ -79,7 +76,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
*/
private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
{
- if (!isEnabled() || cfs.getSSTables().isEmpty())
+ if (cfs.getSSTables().isEmpty())
return Collections.emptyList();
Set<SSTableReader> uncompacting = Sets.intersection(sstables, cfs.getUncompactingSSTables());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dce303bc/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index f9e5d16..8afe6b6 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -87,8 +87,6 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
*/
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
- if (!isEnabled())
- return null;
Collection<AbstractCompactionTask> tasks = getMaximalTask(gcBefore);
if (tasks == null || tasks.size() == 0)
return null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dce303bc/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 93484e8..1f4acdb 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -75,9 +75,6 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
{
- if (!isEnabled())
- return Collections.emptyList();
-
// make local copies so they can't be changed out from under us mid-method
int minThreshold = cfs.getMinimumCompactionThreshold();
int maxThreshold = cfs.getMaximumCompactionThreshold();
@@ -177,9 +174,6 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
- if (!isEnabled())
- return null;
-
while (true)
{
List<SSTableReader> hottestBucket = getNextBackgroundSSTables(gcBefore);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dce303bc/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
index 2f8bd7c..e01b4c6 100644
--- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
+import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -150,18 +151,23 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy
&& repaired.options.equals(metadata.compactionStrategyOptions)
&& unrepaired.options.equals(metadata.compactionStrategyOptions))
return;
-
reloadCompactionStrategy(metadata);
}
public synchronized void reloadCompactionStrategy(CFMetaData metadata)
{
+ boolean disabledWithJMX = !isEnabled() && shouldBeEnabled();
if (repaired != null)
repaired.shutdown();
if (unrepaired != null)
unrepaired.shutdown();
repaired = metadata.createCompactionStrategyInstance(cfs);
unrepaired = metadata.createCompactionStrategyInstance(cfs);
+ options = ImmutableMap.copyOf(metadata.compactionStrategyOptions);
+ if (disabledWithJMX || !shouldBeEnabled())
+ disable();
+ else
+ enable();
startup();
}
@@ -344,6 +350,28 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy
}
@Override
+ public void enable()
+ {
+ if (repaired != null)
+ repaired.enable();
+ if (unrepaired != null)
+ unrepaired.enable();
+ // enable this last to make sure the strategies are ready to get calls.
+ super.enable();
+ }
+
+ @Override
+ public void disable()
+ {
+ // disable this first avoid asking disabled strategies for compaction tasks
+ super.disable();
+ if (repaired != null)
+ repaired.disable();
+ if (unrepaired != null)
+ unrepaired.disable();
+ }
+
+ @Override
public synchronized ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
{
List<SSTableReader> repairedSSTables = new ArrayList<>();
[3/3] cassandra git commit: Merge branch 'cassandra-2.2' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into trunk
Conflicts:
src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d26187e5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d26187e5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d26187e5
Branch: refs/heads/trunk
Commit: d26187e5c2ab2cf08d8c874387b4674b860f5e4d
Parents: 3e75d5a 325aeb7
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Jul 29 17:04:15 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Jul 29 17:04:15 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compaction/AbstractCompactionStrategy.java | 11 +----
.../compaction/CompactionStrategyManager.java | 49 +++++++++++++-------
.../DateTieredCompactionStrategy.java | 5 +-
.../compaction/LeveledCompactionStrategy.java | 3 --
.../SizeTieredCompactionStrategy.java | 6 ---
6 files changed, 34 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 6598286,05dffc8..379d3de
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@@ -85,8 -81,8 +85,6 @@@ public abstract class AbstractCompactio
*/
protected boolean isActive = false;
-- protected volatile boolean enabled = true;
--
protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
{
assert cfs != null;
@@@ -195,19 -191,19 +193,12 @@@
*/
public abstract long getMaxSSTableBytes();
-- public boolean isEnabled()
-- {
-- return this.enabled && this.isActive;
-- }
--
public void enable()
{
-- this.enabled = true;
}
public void disable()
{
-- this.enabled = false;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 8ec7071,0000000..766eb1b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -1,448 -1,0 +1,461 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
++import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
++import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.notifications.INotification;
+import org.apache.cassandra.notifications.INotificationConsumer;
+import org.apache.cassandra.notifications.SSTableAddedNotification;
+import org.apache.cassandra.notifications.SSTableDeletingNotification;
+import org.apache.cassandra.notifications.SSTableListChangedNotification;
+import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
+
+/**
+ * Manages the compaction strategies.
+ *
+ * Currently has two instances of actual compaction strategies - one for repaired data and one for
+ * unrepaired data. This is done to be able to totally separate the different sets of sstables.
+ */
+public class CompactionStrategyManager implements INotificationConsumer
+{
+ protected static final String COMPACTION_ENABLED = "enabled";
+ private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
+ private final ColumnFamilyStore cfs;
+ private volatile AbstractCompactionStrategy repaired;
+ private volatile AbstractCompactionStrategy unrepaired;
+ private volatile boolean enabled = true;
+ public boolean isActive = true;
++ private Map<String, String> options;
+
+ public CompactionStrategyManager(ColumnFamilyStore cfs)
+ {
+ cfs.getTracker().subscribe(this);
+ logger.debug("{} subscribed to the data tracker.", this);
+ this.cfs = cfs;
+ reload(cfs.metadata);
+ String optionValue = cfs.metadata.compactionStrategyOptions.get(COMPACTION_ENABLED);
+ enabled = optionValue == null || Boolean.parseBoolean(optionValue);
++ options = ImmutableMap.copyOf(cfs.metadata.compactionStrategyOptions);
+ }
+
+ /**
+ * Return the next background task
+ *
+ * Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks)
+ *
+ */
+ public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+ {
+ if (!isEnabled())
+ return null;
+
+ maybeReload(cfs.metadata);
+
+ if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
+ {
+ AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
+ if (repairedTask != null)
+ return repairedTask;
+ return unrepaired.getNextBackgroundTask(gcBefore);
+ }
+ else
+ {
+ AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
+ if (unrepairedTask != null)
+ return unrepairedTask;
+ return repaired.getNextBackgroundTask(gcBefore);
+ }
+ }
+
- /**
- * Disable compaction - used with nodetool disableautocompaction for example
- */
- public void disable()
- {
- enabled = false;
- }
-
- /**
- * re-enable disabled compaction.
- */
- public void enable()
- {
- enabled = true;
- }
-
+ public boolean isEnabled()
+ {
+ return enabled && isActive;
+ }
+
+ public synchronized void resume()
+ {
+ isActive = true;
+ }
+
+ /**
+ * pause compaction while we cancel all ongoing compactions
+ *
+ * Separate call from enable/disable to not have to save the enabled-state externally
+ */
+ public synchronized void pause()
+ {
+ isActive = false;
+ }
+
+
+ private void startup()
+ {
+ for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
+ {
+ if (sstable.openReason != SSTableReader.OpenReason.EARLY)
+ getCompactionStrategyFor(sstable).addSSTable(sstable);
+ }
+ repaired.startup();
+ unrepaired.startup();
+ }
+
+ /**
+ * return the compaction strategy for the given sstable
+ *
+ * returns differently based on the repaired status
+ * @param sstable
+ * @return
+ */
+ private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
+ {
+ if (sstable.isRepaired())
+ return repaired;
+ else
+ return unrepaired;
+ }
+
+ public void shutdown()
+ {
+ isActive = false;
+ repaired.shutdown();
+ unrepaired.shutdown();
+ }
+
+
+ public synchronized void maybeReload(CFMetaData metadata)
+ {
+ if (repaired != null && repaired.getClass().equals(metadata.compactionStrategyClass)
+ && unrepaired != null && unrepaired.getClass().equals(metadata.compactionStrategyClass)
+ && repaired.options.equals(metadata.compactionStrategyOptions) // todo: assumes all have the same options
+ && unrepaired.options.equals(metadata.compactionStrategyOptions))
+ return;
-
+ reload(metadata);
+ }
+
+ /**
+ * Reload the compaction strategies
+ *
+ * Called after changing configuration and at startup.
+ * @param metadata
+ */
+ public synchronized void reload(CFMetaData metadata)
+ {
++ boolean disabledWithJMX = !isEnabled() && shouldBeEnabled();
+ if (repaired != null)
+ repaired.shutdown();
+ if (unrepaired != null)
+ unrepaired.shutdown();
+ repaired = metadata.createCompactionStrategyInstance(cfs);
+ unrepaired = metadata.createCompactionStrategyInstance(cfs);
++ options = ImmutableMap.copyOf(metadata.compactionStrategyOptions);
++ if (disabledWithJMX || !shouldBeEnabled())
++ disable();
++ else
++ enable();
+ startup();
+ }
+
+ public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+ {
+ cfs.getTracker().replaceFlushed(memtable, sstable);
+ if (sstable != null)
+ CompactionManager.instance.submitBackground(cfs);
+ }
+
+ public int getUnleveledSSTables()
+ {
+ if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
+ {
+ int count = 0;
+ count += ((LeveledCompactionStrategy)repaired).getLevelSize(0);
+ count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
+ return count;
+ }
+ return 0;
+ }
+
+ public synchronized int[] getSSTableCountPerLevel()
+ {
+ if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
+ {
+ int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
+ int[] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
+ res = sumArrays(res, repairedCountPerLevel);
+ int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
+ res = sumArrays(res, unrepairedCountPerLevel);
+ return res;
+ }
+ return null;
+ }
+
+ private static int[] sumArrays(int[] a, int[] b)
+ {
+ int[] res = new int[Math.max(a.length, b.length)];
+ for (int i = 0; i < res.length; i++)
+ {
+ if (i < a.length && i < b.length)
+ res[i] = a[i] + b[i];
+ else if (i < a.length)
+ res[i] = a[i];
+ else
+ res[i] = b[i];
+ }
+ return res;
+ }
+
+ public boolean shouldDefragment()
+ {
+ assert repaired.getClass().equals(unrepaired.getClass());
+ return repaired.shouldDefragment();
+ }
+
+
+ public synchronized void handleNotification(INotification notification, Object sender)
+ {
+ if (notification instanceof SSTableAddedNotification)
+ {
+ SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
+ if (flushedNotification.added.isRepaired())
+ repaired.addSSTable(flushedNotification.added);
+ else
+ unrepaired.addSSTable(flushedNotification.added);
+ }
+ else if (notification instanceof SSTableListChangedNotification)
+ {
+ SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
+ Set<SSTableReader> repairedRemoved = new HashSet<>();
+ Set<SSTableReader> repairedAdded = new HashSet<>();
+ Set<SSTableReader> unrepairedRemoved = new HashSet<>();
+ Set<SSTableReader> unrepairedAdded = new HashSet<>();
+
+ for (SSTableReader sstable : listChangedNotification.removed)
+ {
+ if (sstable.isRepaired())
+ repairedRemoved.add(sstable);
+ else
+ unrepairedRemoved.add(sstable);
+ }
+ for (SSTableReader sstable : listChangedNotification.added)
+ {
+ if (sstable.isRepaired())
+ repairedAdded.add(sstable);
+ else
+ unrepairedAdded.add(sstable);
+ }
+ if (!repairedRemoved.isEmpty())
+ {
+ repaired.replaceSSTables(repairedRemoved, repairedAdded);
+ }
+ else
+ {
+ for (SSTableReader sstable : repairedAdded)
+ repaired.addSSTable(sstable);
+ }
+
+ if (!unrepairedRemoved.isEmpty())
+ {
+ unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded);
+ }
+ else
+ {
+ for (SSTableReader sstable : unrepairedAdded)
+ unrepaired.addSSTable(sstable);
+ }
+ }
+ else if (notification instanceof SSTableRepairStatusChanged)
+ {
+ for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
+ {
+ if (sstable.isRepaired())
+ {
+ unrepaired.removeSSTable(sstable);
+ repaired.addSSTable(sstable);
+ }
+ else
+ {
+ repaired.removeSSTable(sstable);
+ unrepaired.addSSTable(sstable);
+ }
+ }
+ }
+ else if (notification instanceof SSTableDeletingNotification)
+ {
+ SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
+ if (sstable.isRepaired())
+ repaired.removeSSTable(sstable);
+ else
+ unrepaired.removeSSTable(sstable);
+ }
+ }
+
++ public void enable()
++ {
++ if (repaired != null)
++ repaired.enable();
++ if (unrepaired != null)
++ unrepaired.enable();
++ // enable this last to make sure the strategies are ready to get calls.
++ enabled = true;
++ }
++
++ public void disable()
++ {
++ // disable this first avoid asking disabled strategies for compaction tasks
++ enabled = false;
++ if (repaired != null)
++ repaired.disable();
++ if (unrepaired != null)
++ unrepaired.disable();
++ }
++
+ /**
+ * Create ISSTableScanner from the given sstables
+ *
+ * Delegates the call to the compaction strategies to allow LCS to create a scanner
+ * @param sstables
+ * @param range
+ * @return
+ */
+ @SuppressWarnings("resource")
+ public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+ {
+ List<SSTableReader> repairedSSTables = new ArrayList<>();
+ List<SSTableReader> unrepairedSSTables = new ArrayList<>();
+ for (SSTableReader sstable : sstables)
+ {
+ if (sstable.isRepaired())
+ repairedSSTables.add(sstable);
+ else
+ unrepairedSSTables.add(sstable);
+ }
+
+
+ AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
+ AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
+
+ List<ISSTableScanner> scanners = new ArrayList<>(repairedScanners.scanners.size() + unrepairedScanners.scanners.size());
+ scanners.addAll(repairedScanners.scanners);
+ scanners.addAll(unrepairedScanners.scanners);
+ return new AbstractCompactionStrategy.ScannerList(scanners);
+ }
+
+ public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
+ {
+ return getScanners(sstables, null);
+ }
+
+ public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
+ {
+ return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
+ }
+
+ public long getMaxSSTableBytes()
+ {
+ return unrepaired.getMaxSSTableBytes();
+ }
+
+ public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
+ {
+ return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+ }
+
+ public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
+ {
+ // runWithCompactionsDisabled cancels active compactions and disables them, then we are able
+ // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
+ // sstables are marked the compactions are re-enabled
+ return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>()
+ {
+ @Override
+ public Collection<AbstractCompactionTask> call() throws Exception
+ {
+ synchronized (CompactionStrategyManager.this)
+ {
+ Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput);
+ Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput);
+
+ if (repairedTasks == null && unrepairedTasks == null)
+ return null;
+
+ if (repairedTasks == null)
+ return unrepairedTasks;
+ if (unrepairedTasks == null)
+ return repairedTasks;
+
+ List<AbstractCompactionTask> tasks = new ArrayList<>();
+ tasks.addAll(repairedTasks);
+ tasks.addAll(unrepairedTasks);
+ return tasks;
+ }
+ }
+ }, false);
+ }
+
+ public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+ {
+ return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
+ }
+
+ public int getEstimatedRemainingTasks()
+ {
+ int tasks = 0;
+ tasks += repaired.getEstimatedRemainingTasks();
+ tasks += unrepaired.getEstimatedRemainingTasks();
+
+ return tasks;
+ }
+
+ public boolean shouldBeEnabled()
+ {
- String optionValue = cfs.metadata.compactionStrategyOptions.get(COMPACTION_ENABLED);
++ String optionValue = options.get(COMPACTION_ENABLED);
+ return optionValue == null || Boolean.parseBoolean(optionValue);
+ }
+
+ public String getName()
+ {
+ return unrepaired.getName();
+ }
+
+ public List<AbstractCompactionStrategy> getStrategies()
+ {
+ return Arrays.asList(repaired, unrepaired);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 30d38a1,0d06f67..f5cb2a3
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@@ -87,13 -79,13 +84,13 @@@ public class DateTieredCompactionStrate
*/
private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
{
- if (!isEnabled() || Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
- if (cfs.getSSTables().isEmpty())
++ if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
return Collections.emptyList();
- Set<SSTableReader> uncompacting = Sets.intersection(sstables, cfs.getUncompactingSSTables());
+ Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
// Find fully expired SSTables. Those will be included no matter what.
- Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingSSTables(uncompacting), gcBefore);
+ Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingSSTables(SSTableSet.CANONICAL, uncompacting), gcBefore);
Set<SSTableReader> candidates = Sets.newHashSet(filterSuspectSSTables(uncompacting));
List<SSTableReader> compactionCandidates = new ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), gcBefore));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
[2/3] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/325aeb76
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/325aeb76
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/325aeb76
Branch: refs/heads/trunk
Commit: 325aeb76b9e94b4b108e5608871e4b096f7d8f1c
Parents: a96b207 dce303b
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Jul 29 16:05:05 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Jul 29 16:05:05 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compaction/AbstractCompactionStrategy.java | 2 +-
.../DateTieredCompactionStrategy.java | 5 +---
.../compaction/LeveledCompactionStrategy.java | 3 --
.../SizeTieredCompactionStrategy.java | 6 ----
.../compaction/WrappingCompactionStrategy.java | 30 +++++++++++++++++++-
6 files changed, 32 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/325aeb76/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4717fca,4ef77ed..ec12cb8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,19 -1,7 +1,20 @@@
-2.1.9
+2.2.1
+ * UDF / UDA execution time in trace (CASSANDRA-9723)
+ * Remove repair snapshot leftover on startup (CASSANDRA-7357)
+ * Use random nodes for batch log when only 2 racks (CASSANDRA-8735)
+Merged from 2.1:
+ * Fix handling of enable/disable autocompaction (CASSANDRA-9899)
- * Commit log segment recycling is disabled by default (CASSANDRA-9896)
* Add consistency level to tracing ouput (CASSANDRA-9827)
+Merged from 2.0:
+ * Log when messages are dropped due to cross_node_timeout (CASSANDRA-9793)
+ * Don't track hotness when opening from snapshot for validation (CASSANDRA-9382)
+
+2.2.0
+ * Fix cqlsh copy methods and other windows specific issues (CASSANDRA-9795)
+ * Don't wrap byte arrays in SequentialWriter (CASSANDRA-9797)
+ * sum() and avg() functions missing for smallint and tinyint types (CASSANDRA-9671)
+ * Revert CASSANDRA-9542 (allow native functions in UDA) (CASSANDRA-9771)
+Merged from 2.1:
* Fix MarshalException when upgrading superColumn family (CASSANDRA-9582)
* Fix broken logging for "empty" flushes in Memtable (CASSANDRA-9837)
* Handle corrupt files on startup (CASSANDRA-9686)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/325aeb76/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/325aeb76/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 43f998a,dec0cef..0d06f67
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@@ -56,12 -55,8 +56,9 @@@ public class DateTieredCompactionStrate
}
@Override
+ @SuppressWarnings("resource")
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
- if (!isEnabled())
- return null;
-
while (true)
{
List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/325aeb76/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 9eb58ff,8afe6b6..9a88164
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@@ -88,12 -85,16 +88,9 @@@ public class LeveledCompactionStrategy
* the only difference between background and maximal in LCS is that maximal is still allowed
* (by explicit user request) even when compaction is disabled.
*/
+ @SuppressWarnings("resource")
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
- if (!isEnabled())
- Collection<AbstractCompactionTask> tasks = getMaximalTask(gcBefore);
- if (tasks == null || tasks.size() == 0)
-- return null;
- return tasks.iterator().next();
- }
--
- public Collection<AbstractCompactionTask> getMaximalTask(int gcBefore)
- {
while (true)
{
OperationType op;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/325aeb76/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 74a9757,1f4acdb..4ba2378
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@@ -175,12 -172,8 +172,9 @@@ public class SizeTieredCompactionStrate
return sstr.getReadMeter() == null ? 0.0 : sstr.getReadMeter().twoHourRate() / sstr.estimatedKeys();
}
+ @SuppressWarnings("resource")
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
- if (!isEnabled())
- return null;
-
while (true)
{
List<SSTableReader> hottestBucket = getNextBackgroundSSTables(gcBefore);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/325aeb76/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
index adda0c9,e01b4c6..ffe65d7
--- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
@@@ -344,7 -350,28 +350,29 @@@ public final class WrappingCompactionSt
}
@Override
+ public void enable()
+ {
+ if (repaired != null)
+ repaired.enable();
+ if (unrepaired != null)
+ unrepaired.enable();
+ // enable this last to make sure the strategies are ready to get calls.
+ super.enable();
+ }
+
+ @Override
+ public void disable()
+ {
+ // disable this first avoid asking disabled strategies for compaction tasks
+ super.disable();
+ if (repaired != null)
+ repaired.disable();
+ if (unrepaired != null)
+ unrepaired.disable();
+ }
+
+ @Override
+ @SuppressWarnings("resource")
public synchronized ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
{
List<SSTableReader> repairedSSTables = new ArrayList<>();