You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2017/06/21 08:37:36 UTC

cassandra git commit: Improve calculation of available disk space for compaction

Repository: cassandra
Updated Branches:
  refs/heads/trunk f21202e83 -> 67247394d


Improve calculation of available disk space for compaction

Patch by Krishna Dattu Koneru and Lerh Chuan Low; reviewed by marcuse for CASSANDRA-13068


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/67247394
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/67247394
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/67247394

Branch: refs/heads/trunk
Commit: 67247394d846cf3463d5441b836a47611ec28656
Parents: f21202e
Author: Krishna Koneru <kr...@instaclustr.com>
Authored: Thu Jun 8 10:04:17 2017 +1000
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Jun 21 10:34:16 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/compaction/CompactionTask.java | 117 ++++++++++--------
 .../db/compaction/CompactionsBytemanTest.java   | 122 +++++++++++++++++++
 3 files changed, 188 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/67247394/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0968de9..cc1bdb2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Improve calculation of available disk space for compaction (CASSANDRA-13068)
  * Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579)
  * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570)
  * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67247394/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 789de1e..3d3cd3d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -86,7 +86,7 @@ public class CompactionTask extends AbstractCompactionTask
         return transaction.originals().size();
     }
 
-    public boolean reduceScopeForLimitedSpace(long expectedSize)
+    public boolean reduceScopeForLimitedSpace(Set<SSTableReader> nonExpiredSSTables, long expectedSize)
     {
         if (partialCompactionsAcceptable() && transaction.originals().size() > 1)
         {
@@ -97,7 +97,7 @@ public class CompactionTask extends AbstractCompactionTask
 
             // Note that we have removed files that are still marked as compacting.
             // This suboptimal but ok since the caller will unmark all the sstables at the end.
-            SSTableReader removedSSTable = cfs.getMaxSizeFile(transaction.originals());
+            SSTableReader removedSSTable = cfs.getMaxSizeFile(nonExpiredSSTables);
             transaction.cancel(removedSSTable);
             return true;
         }
@@ -125,44 +125,46 @@ public class CompactionTask extends AbstractCompactionTask
         if (DatabaseDescriptor.isSnapshotBeforeCompaction())
             cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.name);
 
-        // note that we need to do a rough estimate early if we can fit the compaction on disk - this is pessimistic, but
-        // since we might remove sstables from the compaction in checkAvailableDiskSpace it needs to be done here
+        try (CompactionController controller = getCompactionController(transaction.originals()))
+        {
 
-        checkAvailableDiskSpace();
+            final Set<SSTableReader> fullyExpiredSSTables = controller.getFullyExpiredSSTables();
 
-        // sanity check: all sstables must belong to the same cfs
-        assert !Iterables.any(transaction.originals(), new Predicate<SSTableReader>()
-        {
-            @Override
-            public boolean apply(SSTableReader sstable)
+            // select SSTables to compact based on available disk space.
+            buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables);
+
+            // sanity check: all sstables must belong to the same cfs
+            assert !Iterables.any(transaction.originals(), new Predicate<SSTableReader>()
             {
-                return !sstable.descriptor.cfname.equals(cfs.name);
-            }
-        });
+                @Override
+                public boolean apply(SSTableReader sstable)
+                {
+                    return !sstable.descriptor.cfname.equals(cfs.name);
+                }
+            });
 
-        UUID taskId = transaction.opId();
+            UUID taskId = transaction.opId();
 
-        // new sstables from flush can be added during a compaction, but only the compaction can remove them,
-        // so in our single-threaded compaction world this is a valid way of determining if we're compacting
-        // all the sstables (that existed when we started)
-        StringBuilder ssTableLoggerMsg = new StringBuilder("[");
-        for (SSTableReader sstr : transaction.originals())
-        {
-            ssTableLoggerMsg.append(String.format("%s:level=%d, ", sstr.getFilename(), sstr.getSSTableLevel()));
-        }
-        ssTableLoggerMsg.append("]");
+            // new sstables from flush can be added during a compaction, but only the compaction can remove them,
+            // so in our single-threaded compaction world this is a valid way of determining if we're compacting
+            // all the sstables (that existed when we started)
+            StringBuilder ssTableLoggerMsg = new StringBuilder("[");
+            for (SSTableReader sstr : transaction.originals())
+            {
+                ssTableLoggerMsg.append(String.format("%s:level=%d, ", sstr.getFilename(), sstr.getSSTableLevel()));
+            }
+            ssTableLoggerMsg.append("]");
 
-        logger.debug("Compacting ({}) {}", taskId, ssTableLoggerMsg);
+            logger.debug("Compacting ({}) {}", taskId, ssTableLoggerMsg);
 
-        RateLimiter limiter = CompactionManager.instance.getRateLimiter();
-        long start = System.nanoTime();
-        long startTime = System.currentTimeMillis();
-        long totalKeysWritten = 0;
-        long estimatedKeys = 0;
-        long inputSizeBytes;
-        try (CompactionController controller = getCompactionController(transaction.originals()))
-        {
-            Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables());
+            RateLimiter limiter = CompactionManager.instance.getRateLimiter();
+            long start = System.nanoTime();
+            long startTime = System.currentTimeMillis();
+            long totalKeysWritten = 0;
+            long estimatedKeys = 0;
+            long inputSizeBytes;
+
+            Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), fullyExpiredSSTables);
             Collection<SSTableReader> newSStables;
 
             long[] mergedRowCounts;
@@ -335,52 +337,63 @@ public class CompactionTask extends AbstractCompactionTask
         return ids.iterator().next();
     }
 
+
     /*
-    Checks if we have enough disk space to execute the compaction.  Drops the largest sstable out of the Task until
-    there's enough space (in theory) to handle the compaction.  Does not take into account space that will be taken by
-    other compactions.
+     * Checks if we have enough disk space to execute the compaction.  Drops the largest sstable out of the Task until
+     * there's enough space (in theory) to handle the compaction.  Does not take into account space that will be taken by
+     * other compactions.
      */
-    protected void checkAvailableDiskSpace()
+    protected void buildCompactionCandidatesForAvailableDiskSpace(final Set<SSTableReader> fullyExpiredSSTables)
     {
         if(!cfs.isCompactionDiskSpaceCheckEnabled() && compactionType == OperationType.COMPACTION)
         {
             logger.info("Compaction space check is disabled");
-            return;
+            return; // try to compact all SSTables
         }
 
+        final Set<SSTableReader> nonExpiredSSTables = Sets.difference(transaction.originals(), fullyExpiredSSTables);
         CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
         int sstablesRemoved = 0;
-        while(true)
+
+        while(!nonExpiredSSTables.isEmpty())
         {
-            long expectedWriteSize = cfs.getExpectedCompactedFileSize(transaction.originals(), compactionType);
+            // Only consider write size of non expired SSTables
+            long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
             long estimatedSSTables = Math.max(1, expectedWriteSize / strategy.getMaxSSTableBytes());
 
             if(cfs.getDirectories().hasAvailableDiskSpace(estimatedSSTables, expectedWriteSize))
-            {
-                // we're ok now on space so now we track the failures, if any
-                if(sstablesRemoved > 0)
-                {
-                    CompactionManager.instance.incrementCompactionsReduced();
-                    CompactionManager.instance.incrementSstablesDropppedFromCompactions(sstablesRemoved);
-                }
-
                 break;
-            }
 
-            if (!reduceScopeForLimitedSpace(expectedWriteSize))
+            if (!reduceScopeForLimitedSpace(nonExpiredSSTables, expectedWriteSize))
             {
                 // we end up here if we can't take any more sstables out of the compaction.
                 // usually means we've run out of disk space
-                String msg = String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, expectedWriteSize);
 
+                // but we can still compact expired SSTables
+                if(partialCompactionsAcceptable() && fullyExpiredSSTables.size() > 0 )
+                {
+                    // sanity check to make sure we compact only fully expired SSTables.
+                    assert transaction.originals().equals(fullyExpiredSSTables);
+                    break;
+                }
+
+                String msg = String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, expectedWriteSize);
                 logger.warn(msg);
                 CompactionManager.instance.incrementAborted();
                 throw new RuntimeException(msg);
             }
+
             sstablesRemoved++;
             logger.warn("Not enough space for compaction, {}MB estimated.  Reducing scope.",
-                            (float) expectedWriteSize / 1024 / 1024);
+                        (float) expectedWriteSize / 1024 / 1024);
         }
+
+        if(sstablesRemoved > 0)
+        {
+            CompactionManager.instance.incrementCompactionsReduced();
+            CompactionManager.instance.incrementSstablesDropppedFromCompactions(sstablesRemoved);
+        }
+
     }
 
     protected int getLevel()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67247394/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
new file mode 100644
index 0000000..3ca01c1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(BMUnitRunner.class)
+public class CompactionsBytemanTest extends CQLTester
+{
+    /*
+    Return false for the first time hasAvailableDiskSpace is called. i.e first SSTable is too big
+    Create 5 SSTables. After compaction, there should be 2 left - 1 as the 9 SStables which were merged,
+    and the other the SSTable that was 'too large' and failed the hasAvailableDiskSpace check
+     */
+    @Test
+    @BMRules(rules = { @BMRule(name = "One SSTable too big for remaining disk space test",
+    targetClass = "Directories",
+    targetMethod = "hasAvailableDiskSpace",
+    condition = "not flagged(\"done\")",
+    action = "flag(\"done\"); return false;") } )
+    public void testSSTableNotEnoughDiskSpaceForCompactionGetsDropped() throws Throwable
+    {
+        createLowGCGraceTable();
+        final ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        for (int i = 0; i < 5; i++)
+        {
+            createPossiblyExpiredSSTable(cfs, false);
+        }
+        assertEquals(5, getCurrentColumnFamilyStore().getLiveSSTables().size());
+        cfs.forceMajorCompaction(false);
+        assertEquals(2, getCurrentColumnFamilyStore().getLiveSSTables().size());
+        dropTable("DROP TABLE %s");
+    }
+
+    /*
+    Always return false for hasAvailableDiskSpace. i.e node has no more space
+    Create 2 expired SSTables and 1 long lived one. After compaction, there should only be 1 left,
+    as the 2 expired ones would have been compacted away.
+     */
+    @Test
+    @BMRules(rules = { @BMRule(name = "No disk space with expired SSTables test",
+    targetClass = "Directories",
+    targetMethod = "hasAvailableDiskSpace",
+    action = "return false;") } )
+    public void testExpiredSSTablesStillGetDroppedWithNoDiskSpace() throws Throwable
+    {
+        createLowGCGraceTable();
+        final ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        createPossiblyExpiredSSTable(cfs, true);
+        createPossiblyExpiredSSTable(cfs, true);
+        createPossiblyExpiredSSTable(cfs, false);
+        assertEquals(3, cfs.getLiveSSTables().size());
+        Thread.sleep(TimeUnit.SECONDS.toMillis((long)1.5)); // give some time to expire.
+        cfs.forceMajorCompaction(false);
+        assertEquals(1, cfs.getLiveSSTables().size());
+        dropTable("DROP TABLE %s");
+    }
+
+    /*
+    Always return false for hasAvailableDiskSpace. i.e node has no more space
+    Create 2 SSTables. Compaction will not succeed and will throw Runtime Exception
+     */
+    @Test(expected = RuntimeException.class)
+    @BMRules(rules = { @BMRule(name = "No disk space with expired SSTables test",
+    targetClass = "Directories",
+    targetMethod = "hasAvailableDiskSpace",
+    action = "return false;") } )
+    public void testRuntimeExceptionWhenNoDiskSpaceForCompaction() throws Throwable
+    {
+        createLowGCGraceTable();
+        final ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        createPossiblyExpiredSSTable(cfs, false);
+        createPossiblyExpiredSSTable(cfs, false);
+        cfs.forceMajorCompaction(false);
+        dropTable("DROP TABLE %s");
+    }
+
+    private void createPossiblyExpiredSSTable(final ColumnFamilyStore cfs, final boolean expired) throws Throwable
+    {
+        if (expired)
+        {
+            execute("INSERT INTO %s (id, val) values (1, 'expired') USING TTL 1");
+            Thread.sleep(TimeUnit.SECONDS.toMillis((long)1.5));
+        }
+        else
+        {
+            execute("INSERT INTO %s (id, val) values (2, 'immortal')");
+        }
+        cfs.forceBlockingFlush();
+    }
+
+    private void createLowGCGraceTable(){
+        createTable("CREATE TABLE %s (id int PRIMARY KEY, val text) with compaction = {'class':'SizeTieredCompactionStrategy', 'enabled': 'false'} AND gc_grace_seconds=0");
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org