You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/07/29 17:12:52 UTC

[1/3] cassandra git commit: Fix handling of enable/disable autocompaction.

Repository: cassandra
Updated Branches:
  refs/heads/trunk 3e75d5a62 -> d26187e5c


Fix handling of enable/disable autocompaction.

Patch by marcuse; reviewed by Jeremiah Jordan for CASSANDRA-9899


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dce303bc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dce303bc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dce303bc

Branch: refs/heads/trunk
Commit: dce303bcb5748d302448769177a245a30ec3cc19
Parents: 7395207
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jul 27 09:31:08 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Jul 29 15:58:21 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compaction/AbstractCompactionStrategy.java  |  2 +-
 .../DateTieredCompactionStrategy.java           |  5 +---
 .../compaction/LeveledCompactionStrategy.java   |  2 --
 .../SizeTieredCompactionStrategy.java           |  6 ----
 .../compaction/WrappingCompactionStrategy.java  | 30 +++++++++++++++++++-
 6 files changed, 32 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dce303bc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c4bb21c..4ef77ed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.9
+ * Fix handling of enable/disable autocompaction (CASSANDRA-9899)
  * Commit log segment recycling is disabled by default (CASSANDRA-9896)
  * Add consistency level to tracing ouput (CASSANDRA-9827)
  * Fix MarshalException when upgrading superColumn family (CASSANDRA-9582)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dce303bc/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 9eac2d0..73cda77 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -60,7 +60,7 @@ public abstract class AbstractCompactionStrategy
     protected static final String UNCHECKED_TOMBSTONE_COMPACTION_OPTION = "unchecked_tombstone_compaction";
     protected static final String COMPACTION_ENABLED = "enabled";
 
-    public final Map<String, String> options;
+    protected Map<String, String> options;
 
     protected final ColumnFamilyStore cfs;
     protected float tombstoneThreshold;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dce303bc/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 15287bd..dec0cef 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -57,9 +57,6 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
     @Override
     public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
-        if (!isEnabled())
-            return null;
-
         while (true)
         {
             List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);
@@ -79,7 +76,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
      */
     private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
     {
-        if (!isEnabled() || cfs.getSSTables().isEmpty())
+        if (cfs.getSSTables().isEmpty())
             return Collections.emptyList();
 
         Set<SSTableReader> uncompacting = Sets.intersection(sstables, cfs.getUncompactingSSTables());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dce303bc/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index f9e5d16..8afe6b6 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -87,8 +87,6 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
      */
     public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
-        if (!isEnabled())
-            return null;
         Collection<AbstractCompactionTask> tasks = getMaximalTask(gcBefore);
         if (tasks == null || tasks.size() == 0)
             return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dce303bc/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 93484e8..1f4acdb 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -75,9 +75,6 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
     private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
     {
-        if (!isEnabled())
-            return Collections.emptyList();
-
         // make local copies so they can't be changed out from under us mid-method
         int minThreshold = cfs.getMinimumCompactionThreshold();
         int maxThreshold = cfs.getMaximumCompactionThreshold();
@@ -177,9 +174,6 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
     public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
-        if (!isEnabled())
-            return null;
-
         while (true)
         {
             List<SSTableReader> hottestBucket = getNextBackgroundSSTables(gcBefore);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dce303bc/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
index 2f8bd7c..e01b4c6 100644
--- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
+import com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -150,18 +151,23 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy
             && repaired.options.equals(metadata.compactionStrategyOptions)
             && unrepaired.options.equals(metadata.compactionStrategyOptions))
             return;
-
         reloadCompactionStrategy(metadata);
     }
 
     public synchronized void reloadCompactionStrategy(CFMetaData metadata)
     {
+        boolean disabledWithJMX = !isEnabled() && shouldBeEnabled();
         if (repaired != null)
             repaired.shutdown();
         if (unrepaired != null)
             unrepaired.shutdown();
         repaired = metadata.createCompactionStrategyInstance(cfs);
         unrepaired = metadata.createCompactionStrategyInstance(cfs);
+        options = ImmutableMap.copyOf(metadata.compactionStrategyOptions);
+        if (disabledWithJMX || !shouldBeEnabled())
+            disable();
+        else
+            enable();
         startup();
     }
 
@@ -344,6 +350,28 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy
     }
 
     @Override
+    public void enable()
+    {
+        if (repaired != null)
+            repaired.enable();
+        if (unrepaired != null)
+            unrepaired.enable();
+        // enable this last to make sure the strategies are ready to get calls.
+        super.enable();
+    }
+
+    @Override
+    public void disable()
+    {
+        // disable this first avoid asking disabled strategies for compaction tasks
+        super.disable();
+        if (repaired != null)
+            repaired.disable();
+        if (unrepaired != null)
+            unrepaired.disable();
+    }
+
+    @Override
     public synchronized ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
     {
         List<SSTableReader> repairedSSTables = new ArrayList<>();


[3/3] cassandra git commit: Merge branch 'cassandra-2.2' into trunk

Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
	src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d26187e5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d26187e5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d26187e5

Branch: refs/heads/trunk
Commit: d26187e5c2ab2cf08d8c874387b4674b860f5e4d
Parents: 3e75d5a 325aeb7
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Jul 29 17:04:15 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Jul 29 17:04:15 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compaction/AbstractCompactionStrategy.java  | 11 +----
 .../compaction/CompactionStrategyManager.java   | 49 +++++++++++++-------
 .../DateTieredCompactionStrategy.java           |  5 +-
 .../compaction/LeveledCompactionStrategy.java   |  3 --
 .../SizeTieredCompactionStrategy.java           |  6 ---
 6 files changed, 34 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 6598286,05dffc8..379d3de
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@@ -85,8 -81,8 +85,6 @@@ public abstract class AbstractCompactio
       */
      protected boolean isActive = false;
  
--    protected volatile boolean enabled = true;
--
      protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
      {
          assert cfs != null;
@@@ -195,19 -191,19 +193,12 @@@
       */
      public abstract long getMaxSSTableBytes();
  
--    public boolean isEnabled()
--    {
--        return this.enabled && this.isActive;
--    }
--
      public void enable()
      {
--        this.enabled = true;
      }
  
      public void disable()
      {
--        this.enabled = false;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 8ec7071,0000000..766eb1b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -1,448 -1,0 +1,461 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.compaction;
 +
 +
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.HashSet;
 +import java.util.List;
++import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.Callable;
 +
++import com.google.common.collect.ImmutableMap;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Memtable;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.notifications.INotification;
 +import org.apache.cassandra.notifications.INotificationConsumer;
 +import org.apache.cassandra.notifications.SSTableAddedNotification;
 +import org.apache.cassandra.notifications.SSTableDeletingNotification;
 +import org.apache.cassandra.notifications.SSTableListChangedNotification;
 +import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
 +
 +/**
 + * Manages the compaction strategies.
 + *
 + * Currently has two instances of actual compaction strategies - one for repaired data and one for
 + * unrepaired data. This is done to be able to totally separate the different sets of sstables.
 + */
 +public class CompactionStrategyManager implements INotificationConsumer
 +{
 +    protected static final String COMPACTION_ENABLED = "enabled";
 +    private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
 +    private final ColumnFamilyStore cfs;
 +    private volatile AbstractCompactionStrategy repaired;
 +    private volatile AbstractCompactionStrategy unrepaired;
 +    private volatile boolean enabled = true;
 +    public boolean isActive = true;
++    private Map<String, String> options;
 +
 +    public CompactionStrategyManager(ColumnFamilyStore cfs)
 +    {
 +        cfs.getTracker().subscribe(this);
 +        logger.debug("{} subscribed to the data tracker.", this);
 +        this.cfs = cfs;
 +        reload(cfs.metadata);
 +        String optionValue = cfs.metadata.compactionStrategyOptions.get(COMPACTION_ENABLED);
 +        enabled = optionValue == null || Boolean.parseBoolean(optionValue);
++        options = ImmutableMap.copyOf(cfs.metadata.compactionStrategyOptions);
 +    }
 +
 +    /**
 +     * Return the next background task
 +     *
 +     * Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks)
 +     *
 +     */
 +    public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
 +    {
 +        if (!isEnabled())
 +            return null;
 +
 +        maybeReload(cfs.metadata);
 +
 +        if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
 +        {
 +            AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
 +            if (repairedTask != null)
 +                return repairedTask;
 +            return unrepaired.getNextBackgroundTask(gcBefore);
 +        }
 +        else
 +        {
 +            AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
 +            if (unrepairedTask != null)
 +                return unrepairedTask;
 +            return repaired.getNextBackgroundTask(gcBefore);
 +        }
 +    }
 +
-     /**
-      * Disable compaction - used with nodetool disableautocompaction for example
-      */
-     public void disable()
-     {
-         enabled = false;
-     }
- 
-     /**
-      * re-enable disabled compaction.
-      */
-     public void enable()
-     {
-         enabled = true;
-     }
- 
 +    public boolean isEnabled()
 +    {
 +        return enabled && isActive;
 +    }
 +
 +    public synchronized void resume()
 +    {
 +        isActive = true;
 +    }
 +
 +    /**
 +     * pause compaction while we cancel all ongoing compactions
 +     *
 +     * Separate call from enable/disable to not have to save the enabled-state externally
 +      */
 +    public synchronized void pause()
 +    {
 +        isActive = false;
 +    }
 +
 +
 +    private void startup()
 +    {
 +        for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
 +        {
 +            if (sstable.openReason != SSTableReader.OpenReason.EARLY)
 +                getCompactionStrategyFor(sstable).addSSTable(sstable);
 +        }
 +        repaired.startup();
 +        unrepaired.startup();
 +    }
 +
 +    /**
 +     * return the compaction strategy for the given sstable
 +     *
 +     * returns differently based on the repaired status
 +     * @param sstable
 +     * @return
 +     */
 +    private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
 +    {
 +        if (sstable.isRepaired())
 +            return repaired;
 +        else
 +            return unrepaired;
 +    }
 +
 +    public void shutdown()
 +    {
 +        isActive = false;
 +        repaired.shutdown();
 +        unrepaired.shutdown();
 +    }
 +
 +
 +    public synchronized void maybeReload(CFMetaData metadata)
 +    {
 +        if (repaired != null && repaired.getClass().equals(metadata.compactionStrategyClass)
 +                && unrepaired != null && unrepaired.getClass().equals(metadata.compactionStrategyClass)
 +                && repaired.options.equals(metadata.compactionStrategyOptions) // todo: assumes all have the same options
 +                && unrepaired.options.equals(metadata.compactionStrategyOptions))
 +            return;
- 
 +        reload(metadata);
 +    }
 +
 +    /**
 +     * Reload the compaction strategies
 +     *
 +     * Called after changing configuration and at startup.
 +     * @param metadata
 +     */
 +    public synchronized void reload(CFMetaData metadata)
 +    {
++        boolean disabledWithJMX = !isEnabled() && shouldBeEnabled();
 +        if (repaired != null)
 +            repaired.shutdown();
 +        if (unrepaired != null)
 +            unrepaired.shutdown();
 +        repaired = metadata.createCompactionStrategyInstance(cfs);
 +        unrepaired = metadata.createCompactionStrategyInstance(cfs);
++        options = ImmutableMap.copyOf(metadata.compactionStrategyOptions);
++        if (disabledWithJMX || !shouldBeEnabled())
++            disable();
++        else
++            enable();
 +        startup();
 +    }
 +
 +    public void replaceFlushed(Memtable memtable, SSTableReader sstable)
 +    {
 +        cfs.getTracker().replaceFlushed(memtable, sstable);
 +        if (sstable != null)
 +            CompactionManager.instance.submitBackground(cfs);
 +    }
 +
 +    public int getUnleveledSSTables()
 +    {
 +        if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
 +        {
 +            int count = 0;
 +            count += ((LeveledCompactionStrategy)repaired).getLevelSize(0);
 +            count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
 +            return count;
 +        }
 +        return 0;
 +    }
 +
 +    public synchronized int[] getSSTableCountPerLevel()
 +    {
 +        if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
 +        {
 +            int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
 +            int[] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
 +            res = sumArrays(res, repairedCountPerLevel);
 +            int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
 +            res = sumArrays(res, unrepairedCountPerLevel);
 +            return res;
 +        }
 +        return null;
 +    }
 +
 +    private static int[] sumArrays(int[] a, int[] b)
 +    {
 +        int[] res = new int[Math.max(a.length, b.length)];
 +        for (int i = 0; i < res.length; i++)
 +        {
 +            if (i < a.length && i < b.length)
 +                res[i] = a[i] + b[i];
 +            else if (i < a.length)
 +                res[i] = a[i];
 +            else
 +                res[i] = b[i];
 +        }
 +        return res;
 +    }
 +
 +    public boolean shouldDefragment()
 +    {
 +        assert repaired.getClass().equals(unrepaired.getClass());
 +        return repaired.shouldDefragment();
 +    }
 +
 +
 +    public synchronized void handleNotification(INotification notification, Object sender)
 +    {
 +        if (notification instanceof SSTableAddedNotification)
 +        {
 +            SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
 +            if (flushedNotification.added.isRepaired())
 +                repaired.addSSTable(flushedNotification.added);
 +            else
 +                unrepaired.addSSTable(flushedNotification.added);
 +        }
 +        else if (notification instanceof SSTableListChangedNotification)
 +        {
 +            SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
 +            Set<SSTableReader> repairedRemoved = new HashSet<>();
 +            Set<SSTableReader> repairedAdded = new HashSet<>();
 +            Set<SSTableReader> unrepairedRemoved = new HashSet<>();
 +            Set<SSTableReader> unrepairedAdded = new HashSet<>();
 +
 +            for (SSTableReader sstable : listChangedNotification.removed)
 +            {
 +                if (sstable.isRepaired())
 +                    repairedRemoved.add(sstable);
 +                else
 +                    unrepairedRemoved.add(sstable);
 +            }
 +            for (SSTableReader sstable : listChangedNotification.added)
 +            {
 +                if (sstable.isRepaired())
 +                    repairedAdded.add(sstable);
 +                else
 +                    unrepairedAdded.add(sstable);
 +            }
 +            if (!repairedRemoved.isEmpty())
 +            {
 +                repaired.replaceSSTables(repairedRemoved, repairedAdded);
 +            }
 +            else
 +            {
 +                for (SSTableReader sstable : repairedAdded)
 +                    repaired.addSSTable(sstable);
 +            }
 +
 +            if (!unrepairedRemoved.isEmpty())
 +            {
 +                unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded);
 +            }
 +            else
 +            {
 +                for (SSTableReader sstable : unrepairedAdded)
 +                    unrepaired.addSSTable(sstable);
 +            }
 +        }
 +        else if (notification instanceof SSTableRepairStatusChanged)
 +        {
 +            for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
 +            {
 +                if (sstable.isRepaired())
 +                {
 +                    unrepaired.removeSSTable(sstable);
 +                    repaired.addSSTable(sstable);
 +                }
 +                else
 +                {
 +                    repaired.removeSSTable(sstable);
 +                    unrepaired.addSSTable(sstable);
 +                }
 +            }
 +        }
 +        else if (notification instanceof SSTableDeletingNotification)
 +        {
 +            SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
 +            if (sstable.isRepaired())
 +                repaired.removeSSTable(sstable);
 +            else
 +                unrepaired.removeSSTable(sstable);
 +        }
 +    }
 +
++    public void enable()
++    {
++        if (repaired != null)
++            repaired.enable();
++        if (unrepaired != null)
++            unrepaired.enable();
++        // enable this last to make sure the strategies are ready to get calls.
++        enabled = true;
++    }
++
++    public void disable()
++    {
++        // disable this first avoid asking disabled strategies for compaction tasks
++        enabled = false;
++        if (repaired != null)
++            repaired.disable();
++        if (unrepaired != null)
++            unrepaired.disable();
++    }
++
 +    /**
 +     * Create ISSTableScanner from the given sstables
 +     *
 +     * Delegates the call to the compaction strategies to allow LCS to create a scanner
 +     * @param sstables
 +     * @param range
 +     * @return
 +     */
 +    @SuppressWarnings("resource")
 +    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
 +    {
 +        List<SSTableReader> repairedSSTables = new ArrayList<>();
 +        List<SSTableReader> unrepairedSSTables = new ArrayList<>();
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (sstable.isRepaired())
 +                repairedSSTables.add(sstable);
 +            else
 +                unrepairedSSTables.add(sstable);
 +        }
 +
 +
 +        AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
 +        AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
 +
 +        List<ISSTableScanner> scanners = new ArrayList<>(repairedScanners.scanners.size() + unrepairedScanners.scanners.size());
 +        scanners.addAll(repairedScanners.scanners);
 +        scanners.addAll(unrepairedScanners.scanners);
 +        return new AbstractCompactionStrategy.ScannerList(scanners);
 +    }
 +
 +    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
 +    {
 +        return getScanners(sstables, null);
 +    }
 +
 +    public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
 +    {
 +        return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
 +    }
 +
 +    public long getMaxSSTableBytes()
 +    {
 +        return unrepaired.getMaxSSTableBytes();
 +    }
 +
 +    public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
 +    {
 +        return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
 +    }
 +
 +    public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
 +    {
 +        // runWithCompactionsDisabled cancels active compactions and disables them, then we are able
 +        // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
 +        // sstables are marked the compactions are re-enabled
 +        return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>()
 +        {
 +            @Override
 +            public Collection<AbstractCompactionTask> call() throws Exception
 +            {
 +                synchronized (CompactionStrategyManager.this)
 +                {
 +                    Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput);
 +                    Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput);
 +
 +                    if (repairedTasks == null && unrepairedTasks == null)
 +                        return null;
 +
 +                    if (repairedTasks == null)
 +                        return unrepairedTasks;
 +                    if (unrepairedTasks == null)
 +                        return repairedTasks;
 +
 +                    List<AbstractCompactionTask> tasks = new ArrayList<>();
 +                    tasks.addAll(repairedTasks);
 +                    tasks.addAll(unrepairedTasks);
 +                    return tasks;
 +                }
 +            }
 +        }, false);
 +    }
 +
 +    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
 +    {
 +        return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
 +    }
 +
 +    public int getEstimatedRemainingTasks()
 +    {
 +        int tasks = 0;
 +        tasks += repaired.getEstimatedRemainingTasks();
 +        tasks += unrepaired.getEstimatedRemainingTasks();
 +
 +        return tasks;
 +    }
 +
 +    public boolean shouldBeEnabled()
 +    {
-         String optionValue = cfs.metadata.compactionStrategyOptions.get(COMPACTION_ENABLED);
++        String optionValue = options.get(COMPACTION_ENABLED);
 +        return optionValue == null || Boolean.parseBoolean(optionValue);
 +    }
 +
 +    public String getName()
 +    {
 +        return unrepaired.getName();
 +    }
 +
 +    public List<AbstractCompactionStrategy> getStrategies()
 +    {
 +        return Arrays.asList(repaired, unrepaired);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 30d38a1,0d06f67..f5cb2a3
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@@ -87,13 -79,13 +84,13 @@@ public class DateTieredCompactionStrate
       */
      private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
      {
-         if (!isEnabled() || Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
 -        if (cfs.getSSTables().isEmpty())
++        if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
              return Collections.emptyList();
  
 -        Set<SSTableReader> uncompacting = Sets.intersection(sstables, cfs.getUncompactingSSTables());
 +        Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
  
          // Find fully expired SSTables. Those will be included no matter what.
 -        Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingSSTables(uncompacting), gcBefore);
 +        Set<SSTableReader> expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingSSTables(SSTableSet.CANONICAL, uncompacting), gcBefore);
          Set<SSTableReader> candidates = Sets.newHashSet(filterSuspectSSTables(uncompacting));
  
          List<SSTableReader> compactionCandidates = new ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), gcBefore));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d26187e5/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------


[2/3] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
	src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/325aeb76
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/325aeb76
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/325aeb76

Branch: refs/heads/trunk
Commit: 325aeb76b9e94b4b108e5608871e4b096f7d8f1c
Parents: a96b207 dce303b
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Jul 29 16:05:05 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Jul 29 16:05:05 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compaction/AbstractCompactionStrategy.java  |  2 +-
 .../DateTieredCompactionStrategy.java           |  5 +---
 .../compaction/LeveledCompactionStrategy.java   |  3 --
 .../SizeTieredCompactionStrategy.java           |  6 ----
 .../compaction/WrappingCompactionStrategy.java  | 30 +++++++++++++++++++-
 6 files changed, 32 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/325aeb76/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4717fca,4ef77ed..ec12cb8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,19 -1,7 +1,20 @@@
 -2.1.9
 +2.2.1
 + * UDF / UDA execution time in trace (CASSANDRA-9723)
 + * Remove repair snapshot leftover on startup (CASSANDRA-7357)
 + * Use random nodes for batch log when only 2 racks (CASSANDRA-8735)
 +Merged from 2.1:
+  * Fix handling of enable/disable autocompaction (CASSANDRA-9899)
 - * Commit log segment recycling is disabled by default (CASSANDRA-9896)
   * Add consistency level to tracing ouput (CASSANDRA-9827)
 +Merged from 2.0:
 + * Log when messages are dropped due to cross_node_timeout (CASSANDRA-9793)
 + * Don't track hotness when opening from snapshot for validation (CASSANDRA-9382)
 +
 +2.2.0
 + * Fix cqlsh copy methods and other windows specific issues (CASSANDRA-9795) 
 + * Don't wrap byte arrays in SequentialWriter (CASSANDRA-9797)
 + * sum() and avg() functions missing for smallint and tinyint types (CASSANDRA-9671)
 + * Revert CASSANDRA-9542 (allow native functions in UDA) (CASSANDRA-9771)
 +Merged from 2.1:
   * Fix MarshalException when upgrading superColumn family (CASSANDRA-9582)
   * Fix broken logging for "empty" flushes in Memtable (CASSANDRA-9837)
   * Handle corrupt files on startup (CASSANDRA-9686)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/325aeb76/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/325aeb76/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 43f998a,dec0cef..0d06f67
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@@ -56,12 -55,8 +56,9 @@@ public class DateTieredCompactionStrate
      }
  
      @Override
 +    @SuppressWarnings("resource")
      public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
      {
-         if (!isEnabled())
-             return null;
- 
          while (true)
          {
              List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/325aeb76/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 9eb58ff,8afe6b6..9a88164
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@@ -88,12 -85,16 +88,9 @@@ public class LeveledCompactionStrategy 
       * the only difference between background and maximal in LCS is that maximal is still allowed
       * (by explicit user request) even when compaction is disabled.
       */
 +    @SuppressWarnings("resource")
      public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
      {
-         if (!isEnabled())
 -        Collection<AbstractCompactionTask> tasks = getMaximalTask(gcBefore);
 -        if (tasks == null || tasks.size() == 0)
--            return null;
 -        return tasks.iterator().next();
 -    }
--
 -    public Collection<AbstractCompactionTask> getMaximalTask(int gcBefore)
 -    {
          while (true)
          {
              OperationType op;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/325aeb76/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 74a9757,1f4acdb..4ba2378
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@@ -175,12 -172,8 +172,9 @@@ public class SizeTieredCompactionStrate
          return sstr.getReadMeter() == null ? 0.0 : sstr.getReadMeter().twoHourRate() / sstr.estimatedKeys();
      }
  
 +    @SuppressWarnings("resource")
      public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
      {
-         if (!isEnabled())
-             return null;
- 
          while (true)
          {
              List<SSTableReader> hottestBucket = getNextBackgroundSSTables(gcBefore);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/325aeb76/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
index adda0c9,e01b4c6..ffe65d7
--- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
@@@ -344,7 -350,28 +350,29 @@@ public final class WrappingCompactionSt
      }
  
      @Override
+     public void enable()
+     {
+         if (repaired != null)
+             repaired.enable();
+         if (unrepaired != null)
+             unrepaired.enable();
+         // enable this last to make sure the strategies are ready to get calls.
+         super.enable();
+     }
+ 
+     @Override
+     public void disable()
+     {
+         // disable this first avoid asking disabled strategies for compaction tasks
+         super.disable();
+         if (repaired != null)
+             repaired.disable();
+         if (unrepaired != null)
+             unrepaired.disable();
+     }
+ 
+     @Override
 +    @SuppressWarnings("resource")
      public synchronized ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
      {
          List<SSTableReader> repairedSSTables = new ArrayList<>();