You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/01/12 18:50:40 UTC

[1/2] cassandra git commit: Check for available disk space before starting a compaction.

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 536408380 -> 75378c204


Check for available disk space before starting a compaction.

Patch by marcuse; reviewed by JoshuaMcKenzie for CASSANDRA-8562


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c20d4158
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c20d4158
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c20d4158

Branch: refs/heads/cassandra-2.1
Commit: c20d415833b785bfa5f49d3dd2a7468e111fb5d0
Parents: e4fc395
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jan 12 11:22:23 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jan 12 11:35:53 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                       |  1 +
 src/java/org/apache/cassandra/db/Directories.java | 18 ++++++++++++++++++
 .../cassandra/db/compaction/CompactionTask.java   | 16 +++++++++++++++-
 .../cassandra/io/util/DiskAwareRunnable.java      | 17 +----------------
 4 files changed, 35 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20d4158/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fc43dfa..9b20a06 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.12:
+ * Check for available disk space before starting a compaction (CASSANDRA-8562)
  * Fix DISTINCT queries with LIMITs or paging when some partitions
    contain only tombstones (CASSANDRA-8490)
  * Introduce background cache refreshing to permissions cache

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20d4158/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 69c7a06..8fd1762 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -308,6 +308,24 @@ public class Directories
         Collections.sort(candidates);
     }
 
+    public boolean hasAvailableDiskSpace(long estimatedSSTables, long expectedTotalWriteSize)
+    {
+        long writeSize = expectedTotalWriteSize / estimatedSSTables;
+        long totalAvailable = 0L;
+
+        for (DataDirectory dataDir : dataFileLocations)
+        {
+            if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
+                  continue;
+            DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir);
+            // exclude directory if its total writeSize does not fit to data directory
+            if (candidate.availableSpace < writeSize)
+                continue;
+            totalAvailable += candidate.availableSpace;
+        }
+        return totalAvailable > expectedTotalWriteSize;
+    }
+
 
     public static File getSnapshotDirectory(Descriptor desc, String snapshotName)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20d4158/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 38de8a9..6c6d3a2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -100,6 +100,11 @@ public class CompactionTask extends AbstractCompactionTask
         if (DatabaseDescriptor.isSnapshotBeforeCompaction())
             cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.name);
 
+        // note that we need to do a rough estimate early if we can fit the compaction on disk - this is pessimistic, but
+        // since we might remove sstables from the compaction in checkAvailableDiskSpace it needs to be done here
+        long earlySSTableEstimate = Math.max(1, cfs.getExpectedCompactedFileSize(toCompact, compactionType) / strategy.getMaxSSTableBytes());
+        checkAvailableDiskSpace(earlySSTableEstimate);
+
         // sanity check: all sstables must belong to the same cfs
         for (SSTableReader sstable : toCompact)
             assert sstable.descriptor.cfname.equals(cfs.name);
@@ -118,7 +123,7 @@ public class CompactionTask extends AbstractCompactionTask
         long totalkeysWritten = 0;
 
         long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata));
-        long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
+        long estimatedSSTables = Math.max(1, cfs.getExpectedCompactedFileSize(actuallyCompact, compactionType) / strategy.getMaxSSTableBytes());
         long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
         if (logger.isDebugEnabled())
             logger.debug("Expected bloom filter size : " + keysPerSSTable);
@@ -293,6 +298,15 @@ public class CompactionTask extends AbstractCompactionTask
         logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
     }
 
+    protected void checkAvailableDiskSpace(long estimatedSSTables)
+    {
+        while (!getDirectories().hasAvailableDiskSpace(estimatedSSTables, getExpectedWriteSize()))
+        {
+            if (!reduceScopeForLimitedSpace())
+                throw new RuntimeException(String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, getExpectedWriteSize()));
+        }
+    }
+
     private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable)
     {
         assert sstableDirectory != null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20d4158/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index 4188f6e..925efd6 100644
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@ -24,13 +24,7 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
 {
     protected Directories.DataDirectory getWriteDirectory(long writeSize)
     {
-        Directories.DataDirectory directory;
-        while (true)
-        {
-            directory = getDirectories().getWriteableLocation(writeSize);
-            if (directory != null || !reduceScopeForLimitedSpace())
-                break;
-        }
+        Directories.DataDirectory directory = getDirectories().getWriteableLocation(writeSize);
         if (directory == null)
             throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes");
 
@@ -42,13 +36,4 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
      * @return Directories instance for the CF.
      */
     protected abstract Directories getDirectories();
-
-    /**
-     * Called if no disk is available with free space for the full write size.
-     * @return true if the scope of the task was successfully reduced.
-     */
-    public boolean reduceScopeForLimitedSpace()
-    {
-        return false;
-    }
 }


[2/2] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by ma...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/db/Directories.java
	src/java/org/apache/cassandra/db/compaction/CompactionTask.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/75378c20
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/75378c20
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/75378c20

Branch: refs/heads/cassandra-2.1
Commit: 75378c204c30f5d5f679009885e2ace105793a67
Parents: 5364083 c20d415
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jan 12 18:43:47 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jan 12 18:43:47 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/Directories.java    | 18 ++++++++++++
 .../cassandra/db/compaction/CompactionTask.java | 29 ++++++++++++++++----
 .../cassandra/io/util/DiskAwareRunnable.java    | 17 +-----------
 4 files changed, 43 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/75378c20/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 55ca55d,9b20a06..f2e25c4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,56 -1,5 +1,57 @@@
 -2.0.12:
 +2.1.3
 + * Don't reuse the same cleanup strategy for all sstables (CASSANDRA-8537)
 + * Fix case-sensitivity of index name on CREATE and DROP INDEX
 +   statements (CASSANDRA-8365)
 + * Better detection/logging for corruption in compressed sstables (CASSANDRA-8192)
 + * Use the correct repairedAt value when closing writer (CASSANDRA-8570)
 + * (cqlsh) Handle a schema mismatch being detected on startup (CASSANDRA-8512)
 + * Properly calculate expected write size during compaction (CASSANDRA-8532)
 + * Invalidate affected prepared statements when a table's columns
 +   are altered (CASSANDRA-7910)
 + * Stress - user defined writes should populate sequentally (CASSANDRA-8524)
 + * Fix regression in SSTableRewriter causing some rows to become unreadable 
 +   during compaction (CASSANDRA-8429)
 + * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
 + * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
 +   is disabled (CASSANDRA-8288)
 + * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623)
 + * Make sure we set lastCompactedKey correctly (CASSANDRA-8463)
 + * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507)
 + * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370)
 + * Make sstablescrub check leveled manifest again (CASSANDRA-8432)
 + * Check first/last keys in sstable when giving out positions (CASSANDRA-8458)
 + * Disable mmap on Windows (CASSANDRA-6993)
 + * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253)
 + * Add auth support to cassandra-stress (CASSANDRA-7985)
 + * Fix ArrayIndexOutOfBoundsException when generating error message
 +   for some CQL syntax errors (CASSANDRA-8455)
 + * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
 + * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
 + * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
 + * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
 + * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
 + * Remove tmplink files for offline compactions (CASSANDRA-8321)
 + * Reduce maxHintsInProgress (CASSANDRA-8415)
 + * BTree updates may call provided update function twice (CASSANDRA-8018)
 + * Release sstable references after anticompaction (CASSANDRA-8386)
 + * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
 + * Fix high size calculations for prepared statements (CASSANDRA-8231)
 + * Centralize shared executors (CASSANDRA-8055)
 + * Fix filtering for CONTAINS (KEY) relations on frozen collection
 +   clustering columns when the query is restricted to a single
 +   partition (CASSANDRA-8203)
 + * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
 + * Add more log info if readMeter is null (CASSANDRA-8238)
 + * add check of the system wall clock time at startup (CASSANDRA-8305)
 + * Support for frozen collections (CASSANDRA-7859)
 + * Fix overflow on histogram computation (CASSANDRA-8028)
 + * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
 + * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
 + * Improve JBOD disk utilization (CASSANDRA-7386)
 + * Log failed host when preparing incremental repair (CASSANDRA-8228)
 + * Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
 +Merged from 2.0:
+  * Check for available disk space before starting a compaction (CASSANDRA-8562)
   * Fix DISTINCT queries with LIMITs or paging when some partitions
     contain only tombstones (CASSANDRA-8490)
   * Introduce background cache refreshing to permissions cache

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75378c20/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Directories.java
index eb33bd8,8fd1762..6af3082
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@@ -342,6 -308,25 +342,24 @@@ public class Directorie
          Collections.sort(candidates);
      }
  
+     public boolean hasAvailableDiskSpace(long estimatedSSTables, long expectedTotalWriteSize)
+     {
+         long writeSize = expectedTotalWriteSize / estimatedSSTables;
+         long totalAvailable = 0L;
+ 
 -        for (DataDirectory dataDir : dataFileLocations)
++        for (DataDirectory dataDir : dataDirectories)
+         {
+             if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
+                   continue;
+             DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir);
+             // exclude directory if its total writeSize does not fit to data directory
+             if (candidate.availableSpace < writeSize)
+                 continue;
+             totalAvailable += candidate.availableSpace;
+         }
+         return totalAvailable > expectedTotalWriteSize;
+     }
+ 
 -
      public static File getSnapshotDirectory(Descriptor desc, String snapshotName)
      {
          return getOrCreate(desc.directory, SNAPSHOT_SUBDIR, snapshotName);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/75378c20/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index d215b4c,6c6d3a2..eda09c0
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -19,19 -19,10 +19,20 @@@ package org.apache.cassandra.db.compact
  
  import java.io.File;
  import java.io.IOException;
 -import java.util.*;
++import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.UUID;
  import java.util.concurrent.TimeUnit;
  
 -import com.google.common.base.Throwables;
 +import com.google.common.base.Predicate;
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.Iterables;
  import com.google.common.collect.Sets;
  import org.apache.commons.lang3.StringUtils;
  import org.slf4j.Logger;
@@@ -83,18 -68,18 +84,20 @@@ public class CompactionTask extends Abs
  
      public boolean reduceScopeForLimitedSpace()
      {
 -        if (partialCompactionsAcceptable() && toCompact.size() > 1)
 +        if (partialCompactionsAcceptable() && sstables.size() > 1)
          {
              // Try again w/o the largest one.
 -            logger.warn("insufficient space to compact all requested files " + StringUtils.join(toCompact, ", "));
 +            logger.warn("insufficient space to compact all requested files {}", StringUtils.join(sstables, ", "));
              // Note that we have removed files that are still marked as compacting.
              // This suboptimal but ok since the caller will unmark all the sstables at the end.
-             return sstables.remove(cfs.getMaxSizeFile(sstables));
-         }
-         else
-         {
-             return false;
 -            return toCompact.remove(cfs.getMaxSizeFile(toCompact));
 -        }
 -        else
 -        {
 -            return false;
++            SSTableReader removedSSTable = cfs.getMaxSizeFile(sstables);
++            if (sstables.remove(removedSSTable))
++            {
++                cfs.getDataTracker().unmarkCompacting(Arrays.asList(removedSSTable));
++                return true;
++            }
          }
++        return false;
      }
  
      /**
@@@ -118,159 -100,214 +121,173 @@@
          if (DatabaseDescriptor.isSnapshotBeforeCompaction())
              cfs.snapshotWithoutFlush(System.currentTimeMillis() + "-compact-" + cfs.name);
  
+         // note that we need to do a rough estimate early if we can fit the compaction on disk - this is pessimistic, but
+         // since we might remove sstables from the compaction in checkAvailableDiskSpace it needs to be done here
 -        long earlySSTableEstimate = Math.max(1, cfs.getExpectedCompactedFileSize(toCompact, compactionType) / strategy.getMaxSSTableBytes());
++        long earlySSTableEstimate = Math.max(1, cfs.getExpectedCompactedFileSize(sstables, compactionType) / strategy.getMaxSSTableBytes());
+         checkAvailableDiskSpace(earlySSTableEstimate);
+ 
          // sanity check: all sstables must belong to the same cfs
 -        for (SSTableReader sstable : toCompact)
 -            assert sstable.descriptor.cfname.equals(cfs.name);
 -
 -        UUID taskId = SystemKeyspace.startCompaction(cfs, toCompact);
 +        assert !Iterables.any(sstables, new Predicate<SSTableReader>()
 +        {
 +            @Override
 +            public boolean apply(SSTableReader sstable)
 +            {
 +                return !sstable.descriptor.cfname.equals(cfs.name);
 +            }
 +        });
  
 -        CompactionController controller = getCompactionController(toCompact);
 -        Set<SSTableReader> actuallyCompact = Sets.difference(toCompact, controller.getFullyExpiredSSTables());
 +        UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
  
          // new sstables from flush can be added during a compaction, but only the compaction can remove them,
          // so in our single-threaded compaction world this is a valid way of determining if we're compacting
          // all the sstables (that existed when we started)
 -        logger.info("Compacting {}", toCompact);
 +        logger.info("Compacting {}", sstables);
  
          long start = System.nanoTime();
 -        long totalkeysWritten = 0;
 -
 -        long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata));
 -        long estimatedSSTables = Math.max(1, cfs.getExpectedCompactedFileSize(actuallyCompact, compactionType) / strategy.getMaxSSTableBytes());
 -        long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
 -        if (logger.isDebugEnabled())
 -            logger.debug("Expected bloom filter size : " + keysPerSSTable);
 -
 -        AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
 -                                      ? new ParallelCompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller)
 -                                      : new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
 -        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
 -        Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
 -
 -        // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
 -        // replace the old entries.  Track entries to preheat here until then.
 -        Map<Descriptor, Map<DecoratedKey, RowIndexEntry>> cachedKeyMap =  new HashMap<Descriptor, Map<DecoratedKey, RowIndexEntry>>();
 -
 -        Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
 -        Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
 -
 -        if (collector != null)
 -            collector.beginCompaction(ci);
 -        try
 +        long totalKeysWritten = 0;
 +
 +        try (CompactionController controller = getCompactionController(sstables);)
          {
 -            if (!iter.hasNext())
 -            {
 -                // don't mark compacted in the finally block, since if there _is_ nondeleted data,
 -                // we need to sync it (via closeAndOpen) first, so there is no period during which
 -                // a crash could cause data loss.
 -                cfs.markObsolete(toCompact, compactionType);
 -                return;
 -            }
 +            Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
  
 -            long writeSize = getExpectedWriteSize() / estimatedSSTables;
 -            Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
 -            SSTableWriter writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable);
 -            writers.add(writer);
 -            while (iter.hasNext())
 -            {
 -                if (ci.isStopRequested())
 -                    throw new CompactionInterruptedException(ci.getCompactionInfo());
 +            long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
-             long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
++            long estimatedSSTables = Math.max(1, cfs.getExpectedCompactedFileSize(actuallyCompact, compactionType) / strategy.getMaxSSTableBytes());
 +            long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
 +            long expectedSSTableSize = Math.min(getExpectedWriteSize(), strategy.getMaxSSTableBytes());
 +            logger.debug("Expected bloom filter size : {}", keysPerSSTable);
  
 -                AbstractCompactedRow row = iter.next();
 -                RowIndexEntry indexEntry = writer.append(row);
 -                if (indexEntry == null)
 -                {
 -                    controller.invalidateCachedRow(row.key);
 -                    row.close();
 -                    continue;
 -                }
 +            List<SSTableReader> newSStables;
 +            AbstractCompactionIterable ci;
  
 -                totalkeysWritten++;
 -
 -                if (DatabaseDescriptor.getPreheatKeyCache())
 +            // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references
 +            // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed.
 +            // See CASSANDRA-8019 and CASSANDRA-8399
 +            try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
 +            {
 +                ci = new CompactionIterable(compactionType, scanners.scanners, controller);
 +                Iterator<AbstractCompactedRow> iter = ci.iterator();
 +                // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
 +                // replace the old entries.  Track entries to preheat here until then.
 +                long minRepairedAt = getMinRepairedAt(actuallyCompact);
 +                // we only need the age of the data that we're actually retaining
 +                long maxAge = getMaxDataAge(actuallyCompact);
 +                if (collector != null)
 +                    collector.beginCompaction(ci);
 +                long lastCheckObsoletion = start;
 +                SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, offline);
 +                try
                  {
 -                    for (SSTableReader sstable : actuallyCompact)
 +                    if (!iter.hasNext())
 +                    {
 +                        // don't mark compacted in the finally block, since if there _is_ nondeleted data,
 +                        // we need to sync it (via closeAndOpen) first, so there is no period during which
 +                        // a crash could cause data loss.
 +                        cfs.markObsolete(sstables, compactionType);
 +                        return;
 +                    }
 +
 +                    writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(expectedSSTableSize)), keysPerSSTable, minRepairedAt));
 +                    while (iter.hasNext())
                      {
 -                        if (sstable.getCachedPosition(row.key, false) != null)
 +                        if (ci.isStopRequested())
 +                            throw new CompactionInterruptedException(ci.getCompactionInfo());
 +
 +                        AbstractCompactedRow row = iter.next();
 +                        if (writer.append(row) != null)
 +                        {
 +                            totalKeysWritten++;
 +                            if (newSSTableSegmentThresholdReached(writer.currentWriter()))
 +                            {
 +                                writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(expectedSSTableSize)), keysPerSSTable, minRepairedAt));
 +                            }
 +                        }
 +
 +                        if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L))
                          {
 -                            cachedKeys.put(row.key, indexEntry);
 -                            break;
 +                            controller.maybeRefreshOverlaps();
 +                            lastCheckObsoletion = System.nanoTime();
                          }
                      }
 -                }
  
 -                if (newSSTableSegmentThresholdReached(writer))
 +                    // don't replace old sstables yet, as we need to mark the compaction finished in the system table
 +                    newSStables = writer.finish();
 +                }
 +                catch (Throwable t)
                  {
 -                    // tmp = false because later we want to query it with descriptor from SSTableReader
 -                    cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
 -                    writeSize = getExpectedWriteSize() / estimatedSSTables;
 -                    dataDirectory = getWriteDirectory(writeSize);
 -                    writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable);
 -                    writers.add(writer);
 -                    cachedKeys = new HashMap<>();
 +                    writer.abort();
 +                    throw t;
                  }
 -            }
 -
 -            if (writer.getFilePointer() > 0)
 -            {
 -                cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
 -            }
 -            else
 -            {
 -                writer.abort();
 -                writers.remove(writer);
 -            }
 +                finally
 +                {
 +                    // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
 +                    // (in replaceCompactedSSTables)
 +                    if (taskId != null)
 +                        SystemKeyspace.finishCompaction(taskId);
  
 -            long maxAge = getMaxDataAge(toCompact);
 -            for (SSTableWriter completedWriter : writers)
 -                sstables.add(completedWriter.closeAndOpenReader(maxAge));
 -        }
 -        catch (Throwable t)
 -        {
 -            for (SSTableWriter writer : writers)
 -                writer.abort();
 -            // also remove already completed SSTables
 -            for (SSTableReader sstable : sstables)
 -            {
 -                sstable.markObsolete();
 -                sstable.releaseReference();
 +                    if (collector != null)
 +                        collector.finishCompaction(ci);
 +                }
              }
 -            throw Throwables.propagate(t);
 -        }
 -        finally
 -        {
 -            controller.close();
 -
 -            // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
 -            // (in replaceCompactedSSTables)
 -            if (taskId != null)
 -                SystemKeyspace.finishCompaction(taskId);
  
 -            if (collector != null)
 -                collector.finishCompaction(ci);
 -
 -            try
 -            {
 -                // We don't expect this to throw, but just in case, we do it after the cleanup above, to make sure
 -                // we don't end up with compaction information hanging around indefinitely in limbo.
 -                iter.close();
 -            }
 -            catch (IOException e)
 +            Collection<SSTableReader> oldSStables = this.sstables;
 +            if (!offline)
 +                cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
 +
 +            // log a bunch of statistics about the result and save to system table compaction_history
 +            long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
 +            long startsize = SSTableReader.getTotalBytes(oldSStables);
 +            long endsize = SSTableReader.getTotalBytes(newSStables);
 +            double ratio = (double) endsize / (double) startsize;
 +
 +            StringBuilder newSSTableNames = new StringBuilder();
 +            for (SSTableReader reader : newSStables)
 +                newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
 +
 +            double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
 +            long totalSourceRows = 0;
 +            long[] counts = ci.getMergedRowCounts();
 +            StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
 +            Map<Integer, Long> mergedRows = new HashMap<>();
 +            for (int i = 0; i < counts.length; i++)
              {
 -                throw new RuntimeException(e);
 -            }
 -        }
 +                long count = counts[i];
 +                if (count == 0)
 +                    continue;
  
 -        replaceCompactedSSTables(toCompact, sstables);
 -        // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
 -        for (SSTableReader sstable : sstables)
 -        {
 -            if (sstable.acquireReference())
 -            {
 -                try
 -                {
 -                    sstable.preheat(cachedKeyMap.get(sstable.descriptor));
 -                }
 -                finally
 -                {
 -                    sstable.releaseReference();
 -                }
 +                int rows = i + 1;
 +                totalSourceRows += rows * count;
 +                mergeSummary.append(String.format("%d:%d, ", rows, count));
 +                mergedRows.put(rows, count);
              }
 -        }
  
 -        // log a bunch of statistics about the result and save to system table compaction_history
 -        long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
 -        long startsize = SSTable.getTotalBytes(toCompact);
 -        long endsize = SSTable.getTotalBytes(sstables);
 -        double ratio = (double) endsize / (double) startsize;
 -
 -        StringBuilder builder = new StringBuilder();
 -        for (SSTableReader reader : sstables)
 -            builder.append(reader.descriptor.baseFilename()).append(",");
 -
 -        double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
 -        long totalSourceRows = 0;
 -        long[] counts = ci.getMergedRowCounts();
 -        StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
 -        Map<Integer, Long> mergedRows = new HashMap<Integer, Long>();
 -        for (int i = 0; i < counts.length; i++)
 -        {
 -            long count = counts[i];
 -            if (count == 0)
 -                continue;
 -
 -            int rows = i + 1;
 -            totalSourceRows += rows * count;
 -            mergeSummary.append(String.format("%d:%d, ", rows, count));
 -            mergedRows.put(rows, count);
 +            SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
 +            logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
 +                                      oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
 +            logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
 +            logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
          }
 +    }
  
 -        SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
 -        logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
 -                                  toCompact.size(), builder.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalkeysWritten, mergeSummary.toString()));
 -        logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
 +    private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)
 +    {
 +        long minRepairedAt= Long.MAX_VALUE;
 +        for (SSTableReader sstable : actuallyCompact)
 +            minRepairedAt = Math.min(minRepairedAt, sstable.getSSTableMetadata().repairedAt);
 +        if (minRepairedAt == Long.MAX_VALUE)
 +            return ActiveRepairService.UNREPAIRED_SSTABLE;
 +        return minRepairedAt;
      }
  
+     protected void checkAvailableDiskSpace(long estimatedSSTables)
+     {
+         while (!getDirectories().hasAvailableDiskSpace(estimatedSSTables, getExpectedWriteSize()))
+         {
+             if (!reduceScopeForLimitedSpace())
+                 throw new RuntimeException(String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, getExpectedWriteSize()));
+         }
+     }
+ 
 -    private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable)
 +    private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable, long repairedAt)
      {
          assert sstableDirectory != null;
          return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory),