You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/09/19 19:26:48 UTC

[2/3] git commit: Fix resource leak in event of corrupt sstable

Fix resource leak in event of corrupt sstable

patch by benedict; review by yukim for CASSANDRA-7932


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e831007
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e831007
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e831007

Branch: refs/heads/trunk
Commit: 0e831007760bffced8687f51b99525b650d7e193
Parents: c5c0585
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Sep 19 18:17:19 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Sep 19 18:17:19 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/DataTracker.java    |   5 +-
 .../compaction/AbstractCompactionStrategy.java  |  56 ++++-
 .../db/compaction/CompactionManager.java        | 193 +++++++++---------
 .../cassandra/db/compaction/CompactionTask.java | 203 +++++++++----------
 .../compaction/LeveledCompactionStrategy.java   |  43 ++--
 .../cassandra/db/compaction/Upgrader.java       |   3 +-
 .../cassandra/utils/CloseableIterator.java      |   2 +-
 .../LeveledCompactionStrategyTest.java          |   2 +-
 9 files changed, 286 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d3ee7d9..f55e5d2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.1
+ * Fix resource leak in event of corrupt sstable
  * (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131)
  * Provide visibility into prepared statements churn (CASSANDRA-7921, CASSANDRA-7930)
  * Invalidate prepared statements when their keyspace or table is

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 857e8bd..24ea9dd 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -320,7 +320,7 @@ public class DataTracker
     void removeUnreadableSSTables(File directory)
     {
         View currentView, newView;
-        List<SSTableReader> remaining = new ArrayList<>();
+        Set<SSTableReader> remaining = new HashSet<>();
         do
         {
             currentView = view.get();
@@ -334,6 +334,9 @@ public class DataTracker
             newView = currentView.replace(currentView.sstables, remaining);
         }
         while (!view.compareAndSet(currentView, newView));
+        for (SSTableReader sstable : currentView.sstables)
+            if (!remaining.contains(sstable))
+                sstable.releaseReference();
         notifySSTablesChanged(remaining, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 1bbc93d..97696a8 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.*;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
@@ -264,16 +265,61 @@ public abstract class AbstractCompactionStrategy
      * allow for a more memory efficient solution if we know the sstable don't overlap (see
      * LeveledCompactionStrategy for instance).
      */
-    public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+    public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
     {
         RateLimiter limiter = CompactionManager.instance.getRateLimiter();
         ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>();
-        for (SSTableReader sstable : sstables)
-            scanners.add(sstable.getScanner(range, limiter));
-        return scanners;
+        try
+        {
+            for (SSTableReader sstable : sstables)
+                scanners.add(sstable.getScanner(range, limiter));
+        }
+        catch (Throwable t)
+        {
+            try
+            {
+                new ScannerList(scanners).close();
+            }
+            catch (Throwable t2)
+            {
+                t.addSuppressed(t2);
+            }
+            throw t;
+        }
+        return new ScannerList(scanners);
+    }
+
+    public static class ScannerList implements AutoCloseable
+    {
+        public final List<ICompactionScanner> scanners;
+        public ScannerList(List<ICompactionScanner> scanners)
+        {
+            this.scanners = scanners;
+        }
+
+        public void close()
+        {
+            Throwable t = null;
+            for (ICompactionScanner scanner : scanners)
+            {
+                try
+                {
+                    scanner.close();
+                }
+                catch (Throwable t2)
+                {
+                    if (t == null)
+                        t = t2;
+                    else
+                        t.addSuppressed(t2);
+                }
+            }
+            if (t != null)
+                throw Throwables.propagate(t);
+        }
     }
 
-    public List<ICompactionScanner> getScanners(Collection<SSTableReader> toCompact)
+    public ScannerList getScanners(Collection<SSTableReader> toCompact)
     {
         return getScanners(toCompact, null);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 296fe45..e309cfb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -870,89 +870,98 @@ public class CompactionManager implements CompactionManagerMBean
         if (!cfs.isValid())
             return;
 
-        Collection<SSTableReader> sstables;
-        String snapshotName = validator.desc.sessionId.toString();
-        int gcBefore;
-        boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
-        if (isSnapshotValidation)
-        {
-            // If there is a snapshot created for the session then read from there.
-            sstables = cfs.getSnapshotSSTableReader(snapshotName);
-
-            // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
-            // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
-            // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
-            // 'as good as in the non-snapshot' case)
-            gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
-        }
-        else
+        Collection<SSTableReader> sstables = null;
+        try
         {
-            // flush first so everyone is validating data that is as similar as possible
-            StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
-            // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
-            // instead so they won't be cleaned up if they do get compacted during the validation
-            if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null)
-                sstables = cfs.markCurrentSSTablesReferenced();
-            else
-                sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
 
-            if (validator.gcBefore > 0)
-                gcBefore = validator.gcBefore;
+            String snapshotName = validator.desc.sessionId.toString();
+            int gcBefore;
+            boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
+            if (isSnapshotValidation)
+            {
+                // If there is a snapshot created for the session then read from there.
+                sstables = cfs.getSnapshotSSTableReader(snapshotName);
+
+                // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
+                // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
+                // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
+                // 'as good as in the non-snapshot' case)
+                gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
+            }
             else
-                gcBefore = getDefaultGcBefore(cfs);
-        }
-
-        // Create Merkle tree suitable to hold estimated partitions for given range.
-        // We blindly assume that partition is evenly distributed on all sstables for now.
-        long numPartitions = 0;
-        for (SSTableReader sstable : sstables)
-        {
-            numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
-        }
-        // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
-        int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
-        MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+            {
+                // flush first so everyone is validating data that is as similar as possible
+                StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
+                // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
+                // instead so they won't be cleaned up if they do get compacted during the validation
+                if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null)
+                    sstables = cfs.markCurrentSSTablesReferenced();
+                else
+                    sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
 
-        CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.desc.range, gcBefore);
-        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
+                if (validator.gcBefore > 0)
+                    gcBefore = validator.gcBefore;
+                else
+                    gcBefore = getDefaultGcBefore(cfs);
+            }
 
-        long start = System.nanoTime();
-        metrics.beginCompaction(ci);
-        try
-        {
-            // validate the CF as we iterate over it
-            validator.prepare(cfs, tree);
-            while (iter.hasNext())
+            // Create Merkle tree suitable to hold estimated partitions for given range.
+            // We blindly assume that partition is evenly distributed on all sstables for now.
+            long numPartitions = 0;
+            for (SSTableReader sstable : sstables)
             {
-                if (ci.isStopRequested())
-                    throw new CompactionInterruptedException(ci.getCompactionInfo());
-                AbstractCompactedRow row = iter.next();
-                validator.add(row);
+                numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
             }
-            validator.complete();
-        }
-        finally
-        {
-            iter.close();
-            SSTableReader.releaseReferences(sstables);
-            if (isSnapshotValidation)
+            // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
+            int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+            MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+
+            long start = System.nanoTime();
+            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
             {
-                cfs.clearSnapshot(snapshotName);
+                CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
+                Iterator<AbstractCompactedRow> iter = ci.iterator();
+                metrics.beginCompaction(ci);
+                try
+                {
+                    // validate the CF as we iterate over it
+                    validator.prepare(cfs, tree);
+                    while (iter.hasNext())
+                    {
+                        if (ci.isStopRequested())
+                            throw new CompactionInterruptedException(ci.getCompactionInfo());
+                        AbstractCompactedRow row = iter.next();
+                        validator.add(row);
+                    }
+                    validator.complete();
+                }
+                finally
+                {
+                    if (isSnapshotValidation)
+                    {
+                        cfs.clearSnapshot(snapshotName);
+                    }
+
+                    metrics.finishCompaction(ci);
+                }
             }
 
-            metrics.finishCompaction(ci);
+            if (logger.isDebugEnabled())
+            {
+                // MT serialize may take time
+                long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
+                             duration,
+                             depth,
+                             numPartitions,
+                             MerkleTree.serializer.serializedSize(tree, 0),
+                             validator.desc);
+            }
         }
-
-        if (logger.isDebugEnabled())
+        finally
         {
-            // MT serialize may take time
-            long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-            logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
-                         duration,
-                         depth,
-                         numPartitions,
-                         MerkleTree.serializer.serializedSize(tree, 0),
-                         validator.desc);
+            if (sstables != null)
+                SSTableReader.releaseReferences(sstables);
         }
     }
 
@@ -993,32 +1002,28 @@ public class CompactionManager implements CompactionManagerMBean
             SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
 
             AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-            List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
-
-            try (CompactionController controller = new CompactionController(cfs, new HashSet<>(Collections.singleton(sstable)), CFMetaData.DEFAULT_GC_GRACE_SECONDS))
+            try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
+                 CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
             {
                 repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
                 unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
 
-                CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
-
-                try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
+                CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
+                Iterator<AbstractCompactedRow> iter = ci.iterator();
+                while(iter.hasNext())
                 {
-                    while(iter.hasNext())
+                    AbstractCompactedRow row = iter.next();
+                    // if current range from sstable is repaired, save it into the new repaired sstable
+                    if (Range.isInRanges(row.key.getToken(), ranges))
                     {
-                        AbstractCompactedRow row = iter.next();
-                        // if current range from sstable is repaired, save it into the new repaired sstable
-                        if (Range.isInRanges(row.key.getToken(), ranges))
-                        {
-                            repairedSSTableWriter.append(row);
-                            repairedKeyCount++;
-                        }
-                        // otherwise save into the new 'non-repaired' table
-                        else
-                        {
-                            unRepairedSSTableWriter.append(row);
-                            unrepairedKeyCount++;
-                        }
+                        repairedSSTableWriter.append(row);
+                        repairedKeyCount++;
+                    }
+                    // otherwise save into the new 'non-repaired' table
+                    else
+                    {
+                        unRepairedSSTableWriter.append(row);
+                        unrepairedKeyCount++;
                     }
                 }
                 // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
@@ -1109,11 +1114,9 @@ public class CompactionManager implements CompactionManagerMBean
 
     private static class ValidationCompactionIterable extends CompactionIterable
     {
-        public ValidationCompactionIterable(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Range<Token> range, int gcBefore)
+        public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ICompactionScanner> scanners, int gcBefore)
         {
-            super(OperationType.VALIDATION,
-                  cfs.getCompactionStrategy().getScanners(sstables, range),
-                  new ValidationCompactionController(cfs, gcBefore));
+            super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index c1c5504..6217348 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -129,9 +130,6 @@ public class CompactionTask extends AbstractCompactionTask
 
         UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
 
-        CompactionController controller = getCompactionController(sstables);
-        Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
-
         // new sstables from flush can be added during a compaction, but only the compaction can remove them,
         // so in our single-threaded compaction world this is a valid way of determining if we're compacting
         // all the sstables (that existed when we started)
@@ -139,120 +137,117 @@ public class CompactionTask extends AbstractCompactionTask
 
         long start = System.nanoTime();
         long totalKeysWritten = 0;
-        long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
-        long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
-        long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
-        logger.debug("Expected bloom filter size : {}", keysPerSSTable);
-
-        // TODO: errors when creating the scanners can result in untidied resources
-        AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
-        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-
-        // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
-        // replace the old entries.  Track entries to preheat here until then.
-        long minRepairedAt = getMinRepairedAt(actuallyCompact);
-        // we only need the age of the data that we're actually retaining
-        long maxAge = getMaxDataAge(actuallyCompact);
-        if (collector != null)
-            collector.beginCompaction(ci);
-        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
-        try
+
+        try (CompactionController controller = getCompactionController(sstables);)
         {
-            if (!iter.hasNext())
-            {
-                // don't mark compacted in the finally block, since if there _is_ nondeleted data,
-                // we need to sync it (via closeAndOpen) first, so there is no period during which
-                // a crash could cause data loss.
-                cfs.markObsolete(sstables, compactionType);
-                return;
-            }
 
-            writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
-            while (iter.hasNext())
-            {
-                if (ci.isStopRequested())
-                    throw new CompactionInterruptedException(ci.getCompactionInfo());
+            Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
+
+            long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
+            long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
+            long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
+            logger.debug("Expected bloom filter size : {}", keysPerSSTable);
 
-                AbstractCompactedRow row = iter.next();
-                if (writer.append(row) != null)
+            try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
+            {
+                AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
+                Iterator<AbstractCompactedRow> iter = ci.iterator();
+
+                // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
+                // replace the old entries.  Track entries to preheat here until then.
+                long minRepairedAt = getMinRepairedAt(actuallyCompact);
+                // we only need the age of the data that we're actually retaining
+                long maxAge = getMaxDataAge(actuallyCompact);
+                if (collector != null)
+                    collector.beginCompaction(ci);
+                SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
+                try
                 {
-                    totalKeysWritten++;
-                    if (newSSTableSegmentThresholdReached(writer.currentWriter()))
+                    if (!iter.hasNext())
                     {
-                        writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+                        // don't mark compacted in the finally block, since if there _is_ nondeleted data,
+                        // we need to sync it (via closeAndOpen) first, so there is no period during which
+                        // a crash could cause data loss.
+                        cfs.markObsolete(sstables, compactionType);
+                        return;
                     }
+
+                    writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+                    while (iter.hasNext())
+                    {
+                        if (ci.isStopRequested())
+                            throw new CompactionInterruptedException(ci.getCompactionInfo());
+
+                        AbstractCompactedRow row = iter.next();
+                        if (writer.append(row) != null)
+                        {
+                            totalKeysWritten++;
+                            if (newSSTableSegmentThresholdReached(writer.currentWriter()))
+                            {
+                                writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+                            }
+                        }
+                    }
+
+                    // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+                    writer.finish(false);
                 }
-            }
+                catch (Throwable t)
+                {
+                    writer.abort();
+                    throw t;
+                }
+                finally
+                {
 
-            // don't replace old sstables yet, as we need to mark the compaction finished in the system table
-            writer.finish(false);
-        }
-        catch (Throwable t)
-        {
-            writer.abort();
-            throw t;
-        }
-        finally
-        {
-            controller.close();
+                    // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
+                    // (in replaceCompactedSSTables)
+                    if (taskId != null)
+                        SystemKeyspace.finishCompaction(taskId);
 
-            // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
-            // (in replaceCompactedSSTables)
-            if (taskId != null)
-                SystemKeyspace.finishCompaction(taskId);
+                    if (collector != null)
+                        collector.finishCompaction(ci);
+                }
 
-            if (collector != null)
-                collector.finishCompaction(ci);
+                Collection<SSTableReader> oldSStables = this.sstables;
+                List<SSTableReader> newSStables = writer.finished();
+                if (!offline)
+                    cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
+
+                // log a bunch of statistics about the result and save to system table compaction_history
+                long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                long startsize = SSTableReader.getTotalBytes(oldSStables);
+                long endsize = SSTableReader.getTotalBytes(newSStables);
+                double ratio = (double) endsize / (double) startsize;
+
+                StringBuilder newSSTableNames = new StringBuilder();
+                for (SSTableReader reader : newSStables)
+                    newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+
+                double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+                long totalSourceRows = 0;
+                long[] counts = ci.getMergedRowCounts();
+                StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+                Map<Integer, Long> mergedRows = new HashMap<>();
+                for (int i = 0; i < counts.length; i++)
+                {
+                    long count = counts[i];
+                    if (count == 0)
+                        continue;
+
+                    int rows = i + 1;
+                    totalSourceRows += rows * count;
+                    mergeSummary.append(String.format("%d:%d, ", rows, count));
+                    mergedRows.put(rows, count);
+                }
 
-            try
-            {
-                // We don't expect this to throw, but just in case, we do it after the cleanup above, to make sure
-                // we don't end up with compaction information hanging around indefinitely in limbo.
-                iter.close();
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
+                SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
+                logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
+                                          oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+                logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+                logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
             }
         }
-
-        Collection<SSTableReader> oldSStables = this.sstables;
-        List<SSTableReader> newSStables = writer.finished();
-        if (!offline)
-            cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
-
-        // log a bunch of statistics about the result and save to system table compaction_history
-        long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-        long startsize = SSTableReader.getTotalBytes(oldSStables);
-        long endsize = SSTableReader.getTotalBytes(newSStables);
-        double ratio = (double) endsize / (double) startsize;
-
-        StringBuilder newSSTableNames = new StringBuilder();
-        for (SSTableReader reader : newSStables)
-            newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
-
-        double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
-        long totalSourceRows = 0;
-        long[] counts = ci.getMergedRowCounts();
-        StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
-        Map<Integer, Long> mergedRows = new HashMap<>();
-        for (int i = 0; i < counts.length; i++)
-        {
-            long count = counts[i];
-            if (count == 0)
-                continue;
-
-            int rows = i + 1;
-            totalSourceRows += rows * count;
-            mergeSummary.append(String.format("%d:%d, ", rows, count));
-            mergedRows.put(rows, count);
-        }
-
-        SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
-        logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
-                                  oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
-        logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
-        logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
     }
 
     private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 3ee59ad..7f2d881 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -198,7 +198,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
         return maxSSTableSizeInMB * 1024L * 1024L;
     }
 
-    public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+    public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
     {
         Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
         for (SSTableReader sstable : sstables)
@@ -210,26 +210,41 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
         }
 
         List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size());
-        for (Integer level : byLevel.keySet())
+        try
         {
-            // level can be -1 when sstables are added to DataTracker but not to LeveledManifest
-            // since we don't know which level those sstable belong yet, we simply do the same as L0 sstables.
-            if (level <= 0)
+            for (Integer level : byLevel.keySet())
             {
-                // L0 makes no guarantees about overlapping-ness.  Just create a direct scanner for each
-                for (SSTableReader sstable : byLevel.get(level))
-                    scanners.add(sstable.getScanner(range, CompactionManager.instance.getRateLimiter()));
+                // level can be -1 when sstables are added to DataTracker but not to LeveledManifest
+                // since we don't know which level those sstable belong yet, we simply do the same as L0 sstables.
+                if (level <= 0)
+                {
+                    // L0 makes no guarantees about overlapping-ness.  Just create a direct scanner for each
+                    for (SSTableReader sstable : byLevel.get(level))
+                        scanners.add(sstable.getScanner(range, CompactionManager.instance.getRateLimiter()));
+                }
+                else
+                {
+                    // Create a LeveledScanner that only opens one sstable at a time, in sorted order
+                    List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range);
+                    if (!intersecting.isEmpty())
+                        scanners.add(new LeveledScanner(intersecting, range));
+                }
             }
-            else
+        }
+        catch (Throwable t)
+        {
+            try
+            {
+                new ScannerList(scanners).close();
+            }
+            catch (Throwable t2)
             {
-                // Create a LeveledScanner that only opens one sstable at a time, in sorted order
-                List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range);
-                if (!intersecting.isEmpty())
-                    scanners.add(new LeveledScanner(intersecting, range));
+                t.addSuppressed(t2);
             }
+            throw t;
         }
 
-        return scanners;
+        return new ScannerList(scanners);
     }
 
     // Lazily creates SSTableBoundedScanner for sstable that are assumed to be from the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 734fe23..f102fef 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -88,8 +88,9 @@ public class Upgrader
         outputHandler.output("Upgrading " + sstable);
 
         SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true);
-        try (CloseableIterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, strategy.getScanners(this.toUpgrade), controller).iterator())
+        try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(this.toUpgrade))
         {
+            Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller).iterator();
             writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
             while (iter.hasNext())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/utils/CloseableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CloseableIterator.java b/src/java/org/apache/cassandra/utils/CloseableIterator.java
index 399c6d1..7474f3d 100644
--- a/src/java/org/apache/cassandra/utils/CloseableIterator.java
+++ b/src/java/org/apache/cassandra/utils/CloseableIterator.java
@@ -21,6 +21,6 @@ import java.io.Closeable;
 import java.util.Iterator;
 
 // so we can instantiate anonymous classes implementing both interfaces
-public interface CloseableIterator<T> extends Iterator<T>, Closeable
+public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable, Closeable
 {
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index defb087..65c7b69 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -143,7 +143,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
 
         // get LeveledScanner for level 1 sstables
         Collection<SSTableReader> sstables = strategy.manifest.getLevel(1);
-        List<ICompactionScanner> scanners = strategy.getScanners(sstables);
+        List<ICompactionScanner> scanners = strategy.getScanners(sstables).scanners;
         assertEquals(1, scanners.size()); // should be one per level
         ICompactionScanner scanner = scanners.get(0);
         // scan through to the end