You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2017/06/21 08:37:36 UTC
cassandra git commit: Improve calculation of available disk space for
compaction
Repository: cassandra
Updated Branches:
refs/heads/trunk f21202e83 -> 67247394d
Improve calculation of available disk space for compaction
Patch by Krishna Dattu Koneru and Lerh Chuan Low; reviewed by marcuse for CASSANDRA-13068
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/67247394
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/67247394
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/67247394
Branch: refs/heads/trunk
Commit: 67247394d846cf3463d5441b836a47611ec28656
Parents: f21202e
Author: Krishna Koneru <kr...@instaclustr.com>
Authored: Thu Jun 8 10:04:17 2017 +1000
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Jun 21 10:34:16 2017 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/compaction/CompactionTask.java | 117 ++++++++++--------
.../db/compaction/CompactionsBytemanTest.java | 122 +++++++++++++++++++
3 files changed, 188 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/67247394/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0968de9..cc1bdb2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Improve calculation of available disk space for compaction (CASSANDRA-13068)
* Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579)
* Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570)
* NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/67247394/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 789de1e..3d3cd3d 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -86,7 +86,7 @@ public class CompactionTask extends AbstractCompactionTask
return transaction.originals().size();
}
- public boolean reduceScopeForLimitedSpace(long expectedSize)
+ public boolean reduceScopeForLimitedSpace(Set<SSTableReader> nonExpiredSSTables, long expectedSize)
{
if (partialCompactionsAcceptable() && transaction.originals().size() > 1)
{
@@ -97,7 +97,7 @@ public class CompactionTask extends AbstractCompactionTask
// Note that we have removed files that are still marked as compacting.
// This suboptimal but ok since the caller will unmark all the sstables at the end.
- SSTableReader removedSSTable = cfs.getMaxSizeFile(transaction.originals());
+ SSTableReader removedSSTable = cfs.getMaxSizeFile(nonExpiredSSTables);
transaction.cancel(removedSSTable);
return true;
}
@@ -125,44 +125,46 @@ public class CompactionTask extends AbstractCompactionTask
if (DatabaseDescriptor.isSnapshotBeforeCompaction())
cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.name);
- // note that we need to do a rough estimate early if we can fit the compaction on disk - this is pessimistic, but
- // since we might remove sstables from the compaction in checkAvailableDiskSpace it needs to be done here
+ try (CompactionController controller = getCompactionController(transaction.originals()))
+ {
- checkAvailableDiskSpace();
+ final Set<SSTableReader> fullyExpiredSSTables = controller.getFullyExpiredSSTables();
- // sanity check: all sstables must belong to the same cfs
- assert !Iterables.any(transaction.originals(), new Predicate<SSTableReader>()
- {
- @Override
- public boolean apply(SSTableReader sstable)
+ // select SSTables to compact based on available disk space.
+ buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables);
+
+ // sanity check: all sstables must belong to the same cfs
+ assert !Iterables.any(transaction.originals(), new Predicate<SSTableReader>()
{
- return !sstable.descriptor.cfname.equals(cfs.name);
- }
- });
+ @Override
+ public boolean apply(SSTableReader sstable)
+ {
+ return !sstable.descriptor.cfname.equals(cfs.name);
+ }
+ });
- UUID taskId = transaction.opId();
+ UUID taskId = transaction.opId();
- // new sstables from flush can be added during a compaction, but only the compaction can remove them,
- // so in our single-threaded compaction world this is a valid way of determining if we're compacting
- // all the sstables (that existed when we started)
- StringBuilder ssTableLoggerMsg = new StringBuilder("[");
- for (SSTableReader sstr : transaction.originals())
- {
- ssTableLoggerMsg.append(String.format("%s:level=%d, ", sstr.getFilename(), sstr.getSSTableLevel()));
- }
- ssTableLoggerMsg.append("]");
+ // new sstables from flush can be added during a compaction, but only the compaction can remove them,
+ // so in our single-threaded compaction world this is a valid way of determining if we're compacting
+ // all the sstables (that existed when we started)
+ StringBuilder ssTableLoggerMsg = new StringBuilder("[");
+ for (SSTableReader sstr : transaction.originals())
+ {
+ ssTableLoggerMsg.append(String.format("%s:level=%d, ", sstr.getFilename(), sstr.getSSTableLevel()));
+ }
+ ssTableLoggerMsg.append("]");
- logger.debug("Compacting ({}) {}", taskId, ssTableLoggerMsg);
+ logger.debug("Compacting ({}) {}", taskId, ssTableLoggerMsg);
- RateLimiter limiter = CompactionManager.instance.getRateLimiter();
- long start = System.nanoTime();
- long startTime = System.currentTimeMillis();
- long totalKeysWritten = 0;
- long estimatedKeys = 0;
- long inputSizeBytes;
- try (CompactionController controller = getCompactionController(transaction.originals()))
- {
- Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables());
+ RateLimiter limiter = CompactionManager.instance.getRateLimiter();
+ long start = System.nanoTime();
+ long startTime = System.currentTimeMillis();
+ long totalKeysWritten = 0;
+ long estimatedKeys = 0;
+ long inputSizeBytes;
+
+ Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), fullyExpiredSSTables);
Collection<SSTableReader> newSStables;
long[] mergedRowCounts;
@@ -335,52 +337,63 @@ public class CompactionTask extends AbstractCompactionTask
return ids.iterator().next();
}
+
/*
- Checks if we have enough disk space to execute the compaction. Drops the largest sstable out of the Task until
- there's enough space (in theory) to handle the compaction. Does not take into account space that will be taken by
- other compactions.
+ * Checks if we have enough disk space to execute the compaction. Drops the largest sstable out of the Task until
+ * there's enough space (in theory) to handle the compaction. Does not take into account space that will be taken by
+ * other compactions.
*/
- protected void checkAvailableDiskSpace()
+ protected void buildCompactionCandidatesForAvailableDiskSpace(final Set<SSTableReader> fullyExpiredSSTables)
{
if(!cfs.isCompactionDiskSpaceCheckEnabled() && compactionType == OperationType.COMPACTION)
{
logger.info("Compaction space check is disabled");
- return;
+ return; // try to compact all SSTables
}
+ final Set<SSTableReader> nonExpiredSSTables = Sets.difference(transaction.originals(), fullyExpiredSSTables);
CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
int sstablesRemoved = 0;
- while(true)
+
+ while(!nonExpiredSSTables.isEmpty())
{
- long expectedWriteSize = cfs.getExpectedCompactedFileSize(transaction.originals(), compactionType);
+ // Only consider write size of non expired SSTables
+ long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
long estimatedSSTables = Math.max(1, expectedWriteSize / strategy.getMaxSSTableBytes());
if(cfs.getDirectories().hasAvailableDiskSpace(estimatedSSTables, expectedWriteSize))
- {
- // we're ok now on space so now we track the failures, if any
- if(sstablesRemoved > 0)
- {
- CompactionManager.instance.incrementCompactionsReduced();
- CompactionManager.instance.incrementSstablesDropppedFromCompactions(sstablesRemoved);
- }
-
break;
- }
- if (!reduceScopeForLimitedSpace(expectedWriteSize))
+ if (!reduceScopeForLimitedSpace(nonExpiredSSTables, expectedWriteSize))
{
// we end up here if we can't take any more sstables out of the compaction.
// usually means we've run out of disk space
- String msg = String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, expectedWriteSize);
+ // but we can still compact expired SSTables
+ if(partialCompactionsAcceptable() && fullyExpiredSSTables.size() > 0 )
+ {
+ // sanity check to make sure we compact only fully expired SSTables.
+ assert transaction.originals().equals(fullyExpiredSSTables);
+ break;
+ }
+
+ String msg = String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, expectedWriteSize);
logger.warn(msg);
CompactionManager.instance.incrementAborted();
throw new RuntimeException(msg);
}
+
sstablesRemoved++;
logger.warn("Not enough space for compaction, {}MB estimated. Reducing scope.",
- (float) expectedWriteSize / 1024 / 1024);
+ (float) expectedWriteSize / 1024 / 1024);
}
+
+ if(sstablesRemoved > 0)
+ {
+ CompactionManager.instance.incrementCompactionsReduced();
+ CompactionManager.instance.incrementSstablesDropppedFromCompactions(sstablesRemoved);
+ }
+
}
protected int getLevel()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/67247394/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
new file mode 100644
index 0000000..3ca01c1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsBytemanTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(BMUnitRunner.class)
+public class CompactionsBytemanTest extends CQLTester
+{
+ /*
+ Return false for the first time hasAvailableDiskSpace is called. i.e first SSTable is too big
+ Create 5 SSTables. After compaction, there should be 2 left - 1 as the 9 SStables which were merged,
+ and the other the SSTable that was 'too large' and failed the hasAvailableDiskSpace check
+ */
+ @Test
+ @BMRules(rules = { @BMRule(name = "One SSTable too big for remaining disk space test",
+ targetClass = "Directories",
+ targetMethod = "hasAvailableDiskSpace",
+ condition = "not flagged(\"done\")",
+ action = "flag(\"done\"); return false;") } )
+ public void testSSTableNotEnoughDiskSpaceForCompactionGetsDropped() throws Throwable
+ {
+ createLowGCGraceTable();
+ final ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ for (int i = 0; i < 5; i++)
+ {
+ createPossiblyExpiredSSTable(cfs, false);
+ }
+ assertEquals(5, getCurrentColumnFamilyStore().getLiveSSTables().size());
+ cfs.forceMajorCompaction(false);
+ assertEquals(2, getCurrentColumnFamilyStore().getLiveSSTables().size());
+ dropTable("DROP TABLE %s");
+ }
+
+ /*
+ Always return false for hasAvailableDiskSpace. i.e node has no more space
+ Create 2 expired SSTables and 1 long lived one. After compaction, there should only be 1 left,
+ as the 2 expired ones would have been compacted away.
+ */
+ @Test
+ @BMRules(rules = { @BMRule(name = "No disk space with expired SSTables test",
+ targetClass = "Directories",
+ targetMethod = "hasAvailableDiskSpace",
+ action = "return false;") } )
+ public void testExpiredSSTablesStillGetDroppedWithNoDiskSpace() throws Throwable
+ {
+ createLowGCGraceTable();
+ final ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ createPossiblyExpiredSSTable(cfs, true);
+ createPossiblyExpiredSSTable(cfs, true);
+ createPossiblyExpiredSSTable(cfs, false);
+ assertEquals(3, cfs.getLiveSSTables().size());
+ Thread.sleep(TimeUnit.SECONDS.toMillis((long)1.5)); // give some time to expire.
+ cfs.forceMajorCompaction(false);
+ assertEquals(1, cfs.getLiveSSTables().size());
+ dropTable("DROP TABLE %s");
+ }
+
+ /*
+ Always return false for hasAvailableDiskSpace. i.e node has no more space
+ Create 2 SSTables. Compaction will not succeed and will throw Runtime Exception
+ */
+ @Test(expected = RuntimeException.class)
+ @BMRules(rules = { @BMRule(name = "No disk space with expired SSTables test",
+ targetClass = "Directories",
+ targetMethod = "hasAvailableDiskSpace",
+ action = "return false;") } )
+ public void testRuntimeExceptionWhenNoDiskSpaceForCompaction() throws Throwable
+ {
+ createLowGCGraceTable();
+ final ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ createPossiblyExpiredSSTable(cfs, false);
+ createPossiblyExpiredSSTable(cfs, false);
+ cfs.forceMajorCompaction(false);
+ dropTable("DROP TABLE %s");
+ }
+
+ private void createPossiblyExpiredSSTable(final ColumnFamilyStore cfs, final boolean expired) throws Throwable
+ {
+ if (expired)
+ {
+ execute("INSERT INTO %s (id, val) values (1, 'expired') USING TTL 1");
+ Thread.sleep(TimeUnit.SECONDS.toMillis((long)1.5));
+ }
+ else
+ {
+ execute("INSERT INTO %s (id, val) values (2, 'immortal')");
+ }
+ cfs.forceBlockingFlush();
+ }
+
+ private void createLowGCGraceTable(){
+ createTable("CREATE TABLE %s (id int PRIMARY KEY, val text) with compaction = {'class':'SizeTieredCompactionStrategy', 'enabled': 'false'} AND gc_grace_seconds=0");
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org