You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/01/12 18:51:48 UTC
[1/3] cassandra git commit: Check for available disk space before
starting a compaction.
Repository: cassandra
Updated Branches:
refs/heads/trunk c04c50c95 -> caeef1740
Check for available disk space before starting a compaction.
Patch by marcuse; reviewed by JoshuaMcKenzie for CASSANDRA-8562
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c20d4158
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c20d4158
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c20d4158
Branch: refs/heads/trunk
Commit: c20d415833b785bfa5f49d3dd2a7468e111fb5d0
Parents: e4fc395
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jan 12 11:22:23 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jan 12 11:35:53 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Directories.java | 18 ++++++++++++++++++
.../cassandra/db/compaction/CompactionTask.java | 16 +++++++++++++++-
.../cassandra/io/util/DiskAwareRunnable.java | 17 +----------------
4 files changed, 35 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20d4158/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fc43dfa..9b20a06 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.12:
+ * Check for available disk space before starting a compaction (CASSANDRA-8562)
* Fix DISTINCT queries with LIMITs or paging when some partitions
contain only tombstones (CASSANDRA-8490)
* Introduce background cache refreshing to permissions cache
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20d4158/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 69c7a06..8fd1762 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -308,6 +308,24 @@ public class Directories
Collections.sort(candidates);
}
+ public boolean hasAvailableDiskSpace(long estimatedSSTables, long expectedTotalWriteSize)
+ {
+ long writeSize = expectedTotalWriteSize / estimatedSSTables;
+ long totalAvailable = 0L;
+
+ for (DataDirectory dataDir : dataFileLocations)
+ {
+ if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
+ continue;
+ DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir);
+ // exclude directory if its total writeSize does not fit to data directory
+ if (candidate.availableSpace < writeSize)
+ continue;
+ totalAvailable += candidate.availableSpace;
+ }
+ return totalAvailable > expectedTotalWriteSize;
+ }
+
public static File getSnapshotDirectory(Descriptor desc, String snapshotName)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20d4158/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 38de8a9..6c6d3a2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -100,6 +100,11 @@ public class CompactionTask extends AbstractCompactionTask
if (DatabaseDescriptor.isSnapshotBeforeCompaction())
cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.name);
+ // note that we need to do a rough estimate early if we can fit the compaction on disk - this is pessimistic, but
+ // since we might remove sstables from the compaction in checkAvailableDiskSpace it needs to be done here
+ long earlySSTableEstimate = Math.max(1, cfs.getExpectedCompactedFileSize(toCompact, compactionType) / strategy.getMaxSSTableBytes());
+ checkAvailableDiskSpace(earlySSTableEstimate);
+
// sanity check: all sstables must belong to the same cfs
for (SSTableReader sstable : toCompact)
assert sstable.descriptor.cfname.equals(cfs.name);
@@ -118,7 +123,7 @@ public class CompactionTask extends AbstractCompactionTask
long totalkeysWritten = 0;
long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata));
- long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
+ long estimatedSSTables = Math.max(1, cfs.getExpectedCompactedFileSize(actuallyCompact, compactionType) / strategy.getMaxSSTableBytes());
long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : " + keysPerSSTable);
@@ -293,6 +298,15 @@ public class CompactionTask extends AbstractCompactionTask
logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
}
+ protected void checkAvailableDiskSpace(long estimatedSSTables)
+ {
+ while (!getDirectories().hasAvailableDiskSpace(estimatedSSTables, getExpectedWriteSize()))
+ {
+ if (!reduceScopeForLimitedSpace())
+ throw new RuntimeException(String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, getExpectedWriteSize()));
+ }
+ }
+
private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable)
{
assert sstableDirectory != null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20d4158/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index 4188f6e..925efd6 100644
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@ -24,13 +24,7 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
{
protected Directories.DataDirectory getWriteDirectory(long writeSize)
{
- Directories.DataDirectory directory;
- while (true)
- {
- directory = getDirectories().getWriteableLocation(writeSize);
- if (directory != null || !reduceScopeForLimitedSpace())
- break;
- }
+ Directories.DataDirectory directory = getDirectories().getWriteableLocation(writeSize);
if (directory == null)
throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes");
@@ -42,13 +36,4 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
* @return Directories instance for the CF.
*/
protected abstract Directories getDirectories();
-
- /**
- * Called if no disk is available with free space for the full write size.
- * @return true if the scope of the task was successfully reduced.
- */
- public boolean reduceScopeForLimitedSpace()
- {
- return false;
- }
}
[3/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/caeef174
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/caeef174
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/caeef174
Branch: refs/heads/trunk
Commit: caeef1740703a309a28c8621e5b4e8107dc1edb7
Parents: c04c50c 75378c2
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jan 12 18:45:22 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jan 12 18:45:22 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/Directories.java | 18 ++++++++++++
.../cassandra/db/compaction/CompactionTask.java | 29 ++++++++++++++++----
.../cassandra/io/util/DiskAwareRunnable.java | 17 +-----------
4 files changed, 43 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/caeef174/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/caeef174/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Directories.java
index 4c4078c,6af3082..fa9b320
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@@ -345,21 -342,27 +345,39 @@@ public class Directorie
Collections.sort(candidates);
}
+ public boolean hasAvailableDiskSpace(long estimatedSSTables, long expectedTotalWriteSize)
+ {
+ long writeSize = expectedTotalWriteSize / estimatedSSTables;
+ long totalAvailable = 0L;
+
+ for (DataDirectory dataDir : dataDirectories)
+ {
+ if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
+ continue;
+ DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir);
+ // exclude directory if its total writeSize does not fit to data directory
+ if (candidate.availableSpace < writeSize)
+ continue;
+ totalAvailable += candidate.availableSpace;
+ }
+ return totalAvailable > expectedTotalWriteSize;
+ }
+
public static File getSnapshotDirectory(Descriptor desc, String snapshotName)
{
- return getOrCreate(desc.directory, SNAPSHOT_SUBDIR, snapshotName);
+ return getSnapshotDirectory(desc.directory, snapshotName);
+ }
+
+ public static File getSnapshotDirectory(File location, String snapshotName)
+ {
+ if (location.getName().startsWith(SECONDARY_INDEX_NAME_SEPARATOR))
+ {
+ return getOrCreate(location.getParentFile(), SNAPSHOT_SUBDIR, snapshotName, location.getName());
+ }
+ else
+ {
+ return getOrCreate(location, SNAPSHOT_SUBDIR, snapshotName);
+ }
}
public File getSnapshotManifestFile(String snapshotName)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/caeef174/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 8b5058b,eda09c0..dfbdc22
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -19,7 -19,9 +19,8 @@@ package org.apache.cassandra.db.compact
import java.io.File;
import java.io.IOException;
+ import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@@ -149,10 -151,8 +157,10 @@@ public class CompactionTask extends Abs
Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
- long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
+ long estimatedSSTables = Math.max(1, cfs.getExpectedCompactedFileSize(actuallyCompact, compactionType) / strategy.getMaxSSTableBytes());
long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
+ SSTableFormat.Type sstableFormat = getFormatType(sstables);
+
long expectedSSTableSize = Math.min(getExpectedWriteSize(), strategy.getMaxSSTableBytes());
logger.debug("Expected bloom filter size : {}", keysPerSSTable);
@@@ -278,14 -278,24 +286,23 @@@
return minRepairedAt;
}
+ protected void checkAvailableDiskSpace(long estimatedSSTables)
+ {
+ while (!getDirectories().hasAvailableDiskSpace(estimatedSSTables, getExpectedWriteSize()))
+ {
+ if (!reduceScopeForLimitedSpace())
+ throw new RuntimeException(String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, getExpectedWriteSize()));
+ }
+ }
+
- private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable, long repairedAt)
+ private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable, long repairedAt, SSTableFormat.Type type)
{
- assert sstableDirectory != null;
- return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory),
- keysPerSSTable,
- repairedAt,
- cfs.metadata,
- cfs.partitioner,
- new MetadataCollector(sstables, cfs.metadata.comparator, getLevel()));
+ return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory), type),
+ keysPerSSTable,
+ repairedAt,
+ cfs.metadata,
+ cfs.partitioner,
+ new MetadataCollector(sstables, cfs.metadata.comparator, getLevel()));
}
protected int getLevel()
[2/3] cassandra git commit: Merge branch 'cassandra-2.0' into
cassandra-2.1
Posted by ma...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/db/Directories.java
src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/75378c20
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/75378c20
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/75378c20
Branch: refs/heads/trunk
Commit: 75378c204c30f5d5f679009885e2ace105793a67
Parents: 5364083 c20d415
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jan 12 18:43:47 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jan 12 18:43:47 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/Directories.java | 18 ++++++++++++
.../cassandra/db/compaction/CompactionTask.java | 29 ++++++++++++++++----
.../cassandra/io/util/DiskAwareRunnable.java | 17 +-----------
4 files changed, 43 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75378c20/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 55ca55d,9b20a06..f2e25c4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,56 -1,5 +1,57 @@@
-2.0.12:
+2.1.3
+ * Don't reuse the same cleanup strategy for all sstables (CASSANDRA-8537)
+ * Fix case-sensitivity of index name on CREATE and DROP INDEX
+ statements (CASSANDRA-8365)
+ * Better detection/logging for corruption in compressed sstables (CASSANDRA-8192)
+ * Use the correct repairedAt value when closing writer (CASSANDRA-8570)
+ * (cqlsh) Handle a schema mismatch being detected on startup (CASSANDRA-8512)
+ * Properly calculate expected write size during compaction (CASSANDRA-8532)
+ * Invalidate affected prepared statements when a table's columns
+ are altered (CASSANDRA-7910)
+ * Stress - user defined writes should populate sequentally (CASSANDRA-8524)
+ * Fix regression in SSTableRewriter causing some rows to become unreadable
+ during compaction (CASSANDRA-8429)
+ * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
+ * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
+ is disabled (CASSANDRA-8288)
+ * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623)
+ * Make sure we set lastCompactedKey correctly (CASSANDRA-8463)
+ * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507)
+ * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370)
+ * Make sstablescrub check leveled manifest again (CASSANDRA-8432)
+ * Check first/last keys in sstable when giving out positions (CASSANDRA-8458)
+ * Disable mmap on Windows (CASSANDRA-6993)
+ * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253)
+ * Add auth support to cassandra-stress (CASSANDRA-7985)
+ * Fix ArrayIndexOutOfBoundsException when generating error message
+ for some CQL syntax errors (CASSANDRA-8455)
+ * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
+ * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
+ * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
+ * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
+ * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
+ * Remove tmplink files for offline compactions (CASSANDRA-8321)
+ * Reduce maxHintsInProgress (CASSANDRA-8415)
+ * BTree updates may call provided update function twice (CASSANDRA-8018)
+ * Release sstable references after anticompaction (CASSANDRA-8386)
+ * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
+ * Fix high size calculations for prepared statements (CASSANDRA-8231)
+ * Centralize shared executors (CASSANDRA-8055)
+ * Fix filtering for CONTAINS (KEY) relations on frozen collection
+ clustering columns when the query is restricted to a single
+ partition (CASSANDRA-8203)
+ * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
+ * Add more log info if readMeter is null (CASSANDRA-8238)
+ * add check of the system wall clock time at startup (CASSANDRA-8305)
+ * Support for frozen collections (CASSANDRA-7859)
+ * Fix overflow on histogram computation (CASSANDRA-8028)
+ * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
+ * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
+ * Improve JBOD disk utilization (CASSANDRA-7386)
+ * Log failed host when preparing incremental repair (CASSANDRA-8228)
+ * Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
+Merged from 2.0:
+ * Check for available disk space before starting a compaction (CASSANDRA-8562)
* Fix DISTINCT queries with LIMITs or paging when some partitions
contain only tombstones (CASSANDRA-8490)
* Introduce background cache refreshing to permissions cache
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75378c20/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Directories.java
index eb33bd8,8fd1762..6af3082
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@@ -342,6 -308,25 +342,24 @@@ public class Directorie
Collections.sort(candidates);
}
+ public boolean hasAvailableDiskSpace(long estimatedSSTables, long expectedTotalWriteSize)
+ {
+ long writeSize = expectedTotalWriteSize / estimatedSSTables;
+ long totalAvailable = 0L;
+
- for (DataDirectory dataDir : dataFileLocations)
++ for (DataDirectory dataDir : dataDirectories)
+ {
+ if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
+ continue;
+ DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir);
+ // exclude directory if its total writeSize does not fit to data directory
+ if (candidate.availableSpace < writeSize)
+ continue;
+ totalAvailable += candidate.availableSpace;
+ }
+ return totalAvailable > expectedTotalWriteSize;
+ }
+
-
public static File getSnapshotDirectory(Descriptor desc, String snapshotName)
{
return getOrCreate(desc.directory, SNAPSHOT_SUBDIR, snapshotName);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/75378c20/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index d215b4c,6c6d3a2..eda09c0
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -19,19 -19,10 +19,20 @@@ package org.apache.cassandra.db.compact
import java.io.File;
import java.io.IOException;
-import java.util.*;
++import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import com.google.common.base.Throwables;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@@ -83,18 -68,18 +84,20 @@@ public class CompactionTask extends Abs
public boolean reduceScopeForLimitedSpace()
{
- if (partialCompactionsAcceptable() && toCompact.size() > 1)
+ if (partialCompactionsAcceptable() && sstables.size() > 1)
{
// Try again w/o the largest one.
- logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", "));
+ logger.warn("insufficient space to compact all requested files {}", StringUtils.join(sstables, ", "));
// Note that we have removed files that are still marked as compacting.
// This suboptimal but ok since the caller will unmark all the sstables at the end.
- return sstables.remove(cfs.getMaxSizeFile(sstables));
- }
- else
- {
- return false;
- return toCompact.remove(cfs.getMaxSizeFile(toCompact));
- }
- else
- {
- return false;
++ SSTableReader removedSSTable = cfs.getMaxSizeFile(sstables);
++ if (sstables.remove(removedSSTable))
++ {
++ cfs.getDataTracker().unmarkCompacting(Arrays.asList(removedSSTable));
++ return true;
++ }
}
++ return false;
}
/**
@@@ -118,159 -100,214 +121,173 @@@
if (DatabaseDescriptor.isSnapshotBeforeCompaction())
cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.name);
+ // note that we need to do a rough estimate early if we can fit the compaction on disk - this is pessimistic, but
+ // since we might remove sstables from the compaction in checkAvailableDiskSpace it needs to be done here
- long earlySSTableEstimate = Math.max(1, cfs.getExpectedCompactedFileSize(toCompact, compactionType) / strategy.getMaxSSTableBytes());
++ long earlySSTableEstimate = Math.max(1, cfs.getExpectedCompactedFileSize(sstables, compactionType) / strategy.getMaxSSTableBytes());
+ checkAvailableDiskSpace(earlySSTableEstimate);
+
// sanity check: all sstables must belong to the same cfs
- for (SSTableReader sstable : toCompact)
- assert sstable.descriptor.cfname.equals(cfs.name);
-
- UUID taskId = SystemKeyspace.startCompaction(cfs, toCompact);
+ assert !Iterables.any(sstables, new Predicate<SSTableReader>()
+ {
+ @Override
+ public boolean apply(SSTableReader sstable)
+ {
+ return !sstable.descriptor.cfname.equals(cfs.name);
+ }
+ });
- CompactionController controller = getCompactionController(toCompact);
- Set<SSTableReader> actuallyCompact = Sets.difference(toCompact, controller.getFullyExpiredSSTables());
+ UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
- logger.info("Compacting {}", toCompact);
+ logger.info("Compacting {}", sstables);
long start = System.nanoTime();
- long totalkeysWritten = 0;
-
- long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata));
- long estimatedSSTables = Math.max(1, cfs.getExpectedCompactedFileSize(actuallyCompact, compactionType) / strategy.getMaxSSTableBytes());
- long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
- if (logger.isDebugEnabled())
- logger.debug("Expected bloom filter size : " + keysPerSSTable);
-
- AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
- ? new ParallelCompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller)
- : new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
- CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
- Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
-
- // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
- // replace the old entries. Track entries to preheat here until then.
- Map<Descriptor, Map<DecoratedKey, RowIndexEntry>> cachedKeyMap = new HashMap<Descriptor, Map<DecoratedKey, RowIndexEntry>>();
-
- Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
- Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
-
- if (collector != null)
- collector.beginCompaction(ci);
- try
+ long totalKeysWritten = 0;
+
+ try (CompactionController controller = getCompactionController(sstables);)
{
- if (!iter.hasNext())
- {
- // don't mark compacted in the finally block, since if there _is_ nondeleted data,
- // we need to sync it (via closeAndOpen) first, so there is no period during which
- // a crash could cause data loss.
- cfs.markObsolete(toCompact, compactionType);
- return;
- }
+ Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
- long writeSize = getExpectedWriteSize() / estimatedSSTables;
- Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
- SSTableWriter writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable);
- writers.add(writer);
- while (iter.hasNext())
- {
- if (ci.isStopRequested())
- throw new CompactionInterruptedException(ci.getCompactionInfo());
+ long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
- long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
++ long estimatedSSTables = Math.max(1, cfs.getExpectedCompactedFileSize(actuallyCompact, compactionType) / strategy.getMaxSSTableBytes());
+ long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
+ long expectedSSTableSize = Math.min(getExpectedWriteSize(), strategy.getMaxSSTableBytes());
+ logger.debug("Expected bloom filter size : {}", keysPerSSTable);
- AbstractCompactedRow row = iter.next();
- RowIndexEntry indexEntry = writer.append(row);
- if (indexEntry == null)
- {
- controller.invalidateCachedRow(row.key);
- row.close();
- continue;
- }
+ List<SSTableReader> newSStables;
+ AbstractCompactionIterable ci;
- totalkeysWritten++;
-
- if (DatabaseDescriptor.getPreheatKeyCache())
+ // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references
+ // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed.
+ // See CASSANDRA-8019 and CASSANDRA-8399
+ try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
+ {
+ ci = new CompactionIterable(compactionType, scanners.scanners, controller);
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
+ // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
+ // replace the old entries. Track entries to preheat here until then.
+ long minRepairedAt = getMinRepairedAt(actuallyCompact);
+ // we only need the age of the data that we're actually retaining
+ long maxAge = getMaxDataAge(actuallyCompact);
+ if (collector != null)
+ collector.beginCompaction(ci);
+ long lastCheckObsoletion = start;
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, offline);
+ try
{
- for (SSTableReader sstable : actuallyCompact)
+ if (!iter.hasNext())
+ {
+ // don't mark compacted in the finally block, since if there _is_ nondeleted data,
+ // we need to sync it (via closeAndOpen) first, so there is no period during which
+ // a crash could cause data loss.
+ cfs.markObsolete(sstables, compactionType);
+ return;
+ }
+
+ writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(expectedSSTableSize)), keysPerSSTable, minRepairedAt));
+ while (iter.hasNext())
{
- if (sstable.getCachedPosition(row.key, false) != null)
+ if (ci.isStopRequested())
+ throw new CompactionInterruptedException(ci.getCompactionInfo());
+
+ AbstractCompactedRow row = iter.next();
+ if (writer.append(row) != null)
+ {
+ totalKeysWritten++;
+ if (newSSTableSegmentThresholdReached(writer.currentWriter()))
+ {
+ writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(expectedSSTableSize)), keysPerSSTable, minRepairedAt));
+ }
+ }
+
+ if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L))
{
- cachedKeys.put(row.key, indexEntry);
- break;
+ controller.maybeRefreshOverlaps();
+ lastCheckObsoletion = System.nanoTime();
}
}
- }
- if (newSSTableSegmentThresholdReached(writer))
+ // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+ newSStables = writer.finish();
+ }
+ catch (Throwable t)
{
- // tmp = false because later we want to query it with descriptor from SSTableReader
- cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
- writeSize = getExpectedWriteSize() / estimatedSSTables;
- dataDirectory = getWriteDirectory(writeSize);
- writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable);
- writers.add(writer);
- cachedKeys = new HashMap<>();
+ writer.abort();
+ throw t;
}
- }
-
- if (writer.getFilePointer() > 0)
- {
- cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
- }
- else
- {
- writer.abort();
- writers.remove(writer);
- }
+ finally
+ {
+ // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
+ // (in replaceCompactedSSTables)
+ if (taskId != null)
+ SystemKeyspace.finishCompaction(taskId);
- long maxAge = getMaxDataAge(toCompact);
- for (SSTableWriter completedWriter : writers)
- sstables.add(completedWriter.closeAndOpenReader(maxAge));
- }
- catch (Throwable t)
- {
- for (SSTableWriter writer : writers)
- writer.abort();
- // also remove already completed SSTables
- for (SSTableReader sstable : sstables)
- {
- sstable.markObsolete();
- sstable.releaseReference();
+ if (collector != null)
+ collector.finishCompaction(ci);
+ }
}
- throw Throwables.propagate(t);
- }
- finally
- {
- controller.close();
-
- // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
- // (in replaceCompactedSSTables)
- if (taskId != null)
- SystemKeyspace.finishCompaction(taskId);
- if (collector != null)
- collector.finishCompaction(ci);
-
- try
- {
- // We don't expect this to throw, but just in case, we do it after the cleanup above, to make sure
- // we don't end up with compaction information hanging around indefinitely in limbo.
- iter.close();
- }
- catch (IOException e)
+ Collection<SSTableReader> oldSStables = this.sstables;
+ if (!offline)
+ cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
+
+ // log a bunch of statistics about the result and save to system table compaction_history
+ long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ long startsize = SSTableReader.getTotalBytes(oldSStables);
+ long endsize = SSTableReader.getTotalBytes(newSStables);
+ double ratio = (double) endsize / (double) startsize;
+
+ StringBuilder newSSTableNames = new StringBuilder();
+ for (SSTableReader reader : newSStables)
+ newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+
+ double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+ long totalSourceRows = 0;
+ long[] counts = ci.getMergedRowCounts();
+ StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+ Map<Integer, Long> mergedRows = new HashMap<>();
+ for (int i = 0; i < counts.length; i++)
{
- throw new RuntimeException(e);
- }
- }
+ long count = counts[i];
+ if (count == 0)
+ continue;
- replaceCompactedSSTables(toCompact, sstables);
- // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
- for (SSTableReader sstable : sstables)
- {
- if (sstable.acquireReference())
- {
- try
- {
- sstable.preheat(cachedKeyMap.get(sstable.descriptor));
- }
- finally
- {
- sstable.releaseReference();
- }
+ int rows = i + 1;
+ totalSourceRows += rows * count;
+ mergeSummary.append(String.format("%d:%d, ", rows, count));
+ mergedRows.put(rows, count);
}
- }
- // log a bunch of statistics about the result and save to system table compaction_history
- long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- long startsize = SSTable.getTotalBytes(toCompact);
- long endsize = SSTable.getTotalBytes(sstables);
- double ratio = (double) endsize / (double) startsize;
-
- StringBuilder builder = new StringBuilder();
- for (SSTableReader reader : sstables)
- builder.append(reader.descriptor.baseFilename()).append(",");
-
- double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
- long totalSourceRows = 0;
- long[] counts = ci.getMergedRowCounts();
- StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
- Map<Integer, Long> mergedRows = new HashMap<Integer, Long>();
- for (int i = 0; i < counts.length; i++)
- {
- long count = counts[i];
- if (count == 0)
- continue;
-
- int rows = i + 1;
- totalSourceRows += rows * count;
- mergeSummary.append(String.format("%d:%d, ", rows, count));
- mergedRows.put(rows, count);
+ SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
+ logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
+ oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+ logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+ logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
}
+ }
- SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
- logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}",
- toCompact.size(), builder.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalkeysWritten, mergeSummary.toString()));
- logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+ private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)
+ {
+ long minRepairedAt= Long.MAX_VALUE;
+ for (SSTableReader sstable : actuallyCompact)
+ minRepairedAt = Math.min(minRepairedAt, sstable.getSSTableMetadata().repairedAt);
+ if (minRepairedAt == Long.MAX_VALUE)
+ return ActiveRepairService.UNREPAIRED_SSTABLE;
+ return minRepairedAt;
}
+ protected void checkAvailableDiskSpace(long estimatedSSTables)
+ {
+ while (!getDirectories().hasAvailableDiskSpace(estimatedSSTables, getExpectedWriteSize()))
+ {
+ if (!reduceScopeForLimitedSpace())
+ throw new RuntimeException(String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, getExpectedWriteSize()));
+ }
+ }
+
- private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable)
+ private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable, long repairedAt)
{
assert sstableDirectory != null;
return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory),