You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/09/19 19:26:47 UTC

[1/3] git commit: Fix resource leak in event of corrupt sstable

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 c5c0585b4 -> 0e8310077
  refs/heads/trunk 3e305f809 -> 0956a8a71


Fix resource leak in event of corrupt sstable

patch by benedict; review by yukim for CASSANDRA-7932


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e831007
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e831007
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e831007

Branch: refs/heads/cassandra-2.1
Commit: 0e831007760bffced8687f51b99525b650d7e193
Parents: c5c0585
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Sep 19 18:17:19 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Sep 19 18:17:19 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/DataTracker.java    |   5 +-
 .../compaction/AbstractCompactionStrategy.java  |  56 ++++-
 .../db/compaction/CompactionManager.java        | 193 +++++++++---------
 .../cassandra/db/compaction/CompactionTask.java | 203 +++++++++----------
 .../compaction/LeveledCompactionStrategy.java   |  43 ++--
 .../cassandra/db/compaction/Upgrader.java       |   3 +-
 .../cassandra/utils/CloseableIterator.java      |   2 +-
 .../LeveledCompactionStrategyTest.java          |   2 +-
 9 files changed, 286 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d3ee7d9..f55e5d2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.1
+ * Fix resource leak in event of corrupt sstable
  * (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131)
  * Provide visibility into prepared statements churn (CASSANDRA-7921, CASSANDRA-7930)
  * Invalidate prepared statements when their keyspace or table is

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 857e8bd..24ea9dd 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -320,7 +320,7 @@ public class DataTracker
     void removeUnreadableSSTables(File directory)
     {
         View currentView, newView;
-        List<SSTableReader> remaining = new ArrayList<>();
+        Set<SSTableReader> remaining = new HashSet<>();
         do
         {
             currentView = view.get();
@@ -334,6 +334,9 @@ public class DataTracker
             newView = currentView.replace(currentView.sstables, remaining);
         }
         while (!view.compareAndSet(currentView, newView));
+        for (SSTableReader sstable : currentView.sstables)
+            if (!remaining.contains(sstable))
+                sstable.releaseReference();
         notifySSTablesChanged(remaining, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 1bbc93d..97696a8 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.*;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
@@ -264,16 +265,61 @@ public abstract class AbstractCompactionStrategy
      * allow for a more memory efficient solution if we know the sstable don't overlap (see
      * LeveledCompactionStrategy for instance).
      */
-    public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+    public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
     {
         RateLimiter limiter = CompactionManager.instance.getRateLimiter();
         ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>();
-        for (SSTableReader sstable : sstables)
-            scanners.add(sstable.getScanner(range, limiter));
-        return scanners;
+        try
+        {
+            for (SSTableReader sstable : sstables)
+                scanners.add(sstable.getScanner(range, limiter));
+        }
+        catch (Throwable t)
+        {
+            try
+            {
+                new ScannerList(scanners).close();
+            }
+            catch (Throwable t2)
+            {
+                t.addSuppressed(t2);
+            }
+            throw t;
+        }
+        return new ScannerList(scanners);
+    }
+
+    public static class ScannerList implements AutoCloseable
+    {
+        public final List<ICompactionScanner> scanners;
+        public ScannerList(List<ICompactionScanner> scanners)
+        {
+            this.scanners = scanners;
+        }
+
+        public void close()
+        {
+            Throwable t = null;
+            for (ICompactionScanner scanner : scanners)
+            {
+                try
+                {
+                    scanner.close();
+                }
+                catch (Throwable t2)
+                {
+                    if (t == null)
+                        t = t2;
+                    else
+                        t.addSuppressed(t2);
+                }
+            }
+            if (t != null)
+                throw Throwables.propagate(t);
+        }
     }
 
-    public List<ICompactionScanner> getScanners(Collection<SSTableReader> toCompact)
+    public ScannerList getScanners(Collection<SSTableReader> toCompact)
     {
         return getScanners(toCompact, null);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 296fe45..e309cfb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -870,89 +870,98 @@ public class CompactionManager implements CompactionManagerMBean
         if (!cfs.isValid())
             return;
 
-        Collection<SSTableReader> sstables;
-        String snapshotName = validator.desc.sessionId.toString();
-        int gcBefore;
-        boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
-        if (isSnapshotValidation)
-        {
-            // If there is a snapshot created for the session then read from there.
-            sstables = cfs.getSnapshotSSTableReader(snapshotName);
-
-            // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
-            // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
-            // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
-            // 'as good as in the non-snapshot' case)
-            gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
-        }
-        else
+        Collection<SSTableReader> sstables = null;
+        try
         {
-            // flush first so everyone is validating data that is as similar as possible
-            StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
-            // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
-            // instead so they won't be cleaned up if they do get compacted during the validation
-            if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null)
-                sstables = cfs.markCurrentSSTablesReferenced();
-            else
-                sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
 
-            if (validator.gcBefore > 0)
-                gcBefore = validator.gcBefore;
+            String snapshotName = validator.desc.sessionId.toString();
+            int gcBefore;
+            boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
+            if (isSnapshotValidation)
+            {
+                // If there is a snapshot created for the session then read from there.
+                sstables = cfs.getSnapshotSSTableReader(snapshotName);
+
+                // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
+                // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
+                // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
+                // 'as good as in the non-snapshot' case)
+                gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
+            }
             else
-                gcBefore = getDefaultGcBefore(cfs);
-        }
-
-        // Create Merkle tree suitable to hold estimated partitions for given range.
-        // We blindly assume that partition is evenly distributed on all sstables for now.
-        long numPartitions = 0;
-        for (SSTableReader sstable : sstables)
-        {
-            numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
-        }
-        // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
-        int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
-        MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+            {
+                // flush first so everyone is validating data that is as similar as possible
+                StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
+                // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
+                // instead so they won't be cleaned up if they do get compacted during the validation
+                if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null)
+                    sstables = cfs.markCurrentSSTablesReferenced();
+                else
+                    sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
 
-        CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.desc.range, gcBefore);
-        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
+                if (validator.gcBefore > 0)
+                    gcBefore = validator.gcBefore;
+                else
+                    gcBefore = getDefaultGcBefore(cfs);
+            }
 
-        long start = System.nanoTime();
-        metrics.beginCompaction(ci);
-        try
-        {
-            // validate the CF as we iterate over it
-            validator.prepare(cfs, tree);
-            while (iter.hasNext())
+            // Create Merkle tree suitable to hold estimated partitions for given range.
+            // We blindly assume that partition is evenly distributed on all sstables for now.
+            long numPartitions = 0;
+            for (SSTableReader sstable : sstables)
             {
-                if (ci.isStopRequested())
-                    throw new CompactionInterruptedException(ci.getCompactionInfo());
-                AbstractCompactedRow row = iter.next();
-                validator.add(row);
+                numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
             }
-            validator.complete();
-        }
-        finally
-        {
-            iter.close();
-            SSTableReader.releaseReferences(sstables);
-            if (isSnapshotValidation)
+            // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
+            int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+            MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+
+            long start = System.nanoTime();
+            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
             {
-                cfs.clearSnapshot(snapshotName);
+                CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
+                Iterator<AbstractCompactedRow> iter = ci.iterator();
+                metrics.beginCompaction(ci);
+                try
+                {
+                    // validate the CF as we iterate over it
+                    validator.prepare(cfs, tree);
+                    while (iter.hasNext())
+                    {
+                        if (ci.isStopRequested())
+                            throw new CompactionInterruptedException(ci.getCompactionInfo());
+                        AbstractCompactedRow row = iter.next();
+                        validator.add(row);
+                    }
+                    validator.complete();
+                }
+                finally
+                {
+                    if (isSnapshotValidation)
+                    {
+                        cfs.clearSnapshot(snapshotName);
+                    }
+
+                    metrics.finishCompaction(ci);
+                }
             }
 
-            metrics.finishCompaction(ci);
+            if (logger.isDebugEnabled())
+            {
+                // MT serialize may take time
+                long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
+                             duration,
+                             depth,
+                             numPartitions,
+                             MerkleTree.serializer.serializedSize(tree, 0),
+                             validator.desc);
+            }
         }
-
-        if (logger.isDebugEnabled())
+        finally
         {
-            // MT serialize may take time
-            long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-            logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
-                         duration,
-                         depth,
-                         numPartitions,
-                         MerkleTree.serializer.serializedSize(tree, 0),
-                         validator.desc);
+            if (sstables != null)
+                SSTableReader.releaseReferences(sstables);
         }
     }
 
@@ -993,32 +1002,28 @@ public class CompactionManager implements CompactionManagerMBean
             SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
 
             AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-            List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
-
-            try (CompactionController controller = new CompactionController(cfs, new HashSet<>(Collections.singleton(sstable)), CFMetaData.DEFAULT_GC_GRACE_SECONDS))
+            try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
+                 CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
             {
                 repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
                 unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
 
-                CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
-
-                try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
+                CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
+                Iterator<AbstractCompactedRow> iter = ci.iterator();
+                while(iter.hasNext())
                 {
-                    while(iter.hasNext())
+                    AbstractCompactedRow row = iter.next();
+                    // if current range from sstable is repaired, save it into the new repaired sstable
+                    if (Range.isInRanges(row.key.getToken(), ranges))
                     {
-                        AbstractCompactedRow row = iter.next();
-                        // if current range from sstable is repaired, save it into the new repaired sstable
-                        if (Range.isInRanges(row.key.getToken(), ranges))
-                        {
-                            repairedSSTableWriter.append(row);
-                            repairedKeyCount++;
-                        }
-                        // otherwise save into the new 'non-repaired' table
-                        else
-                        {
-                            unRepairedSSTableWriter.append(row);
-                            unrepairedKeyCount++;
-                        }
+                        repairedSSTableWriter.append(row);
+                        repairedKeyCount++;
+                    }
+                    // otherwise save into the new 'non-repaired' table
+                    else
+                    {
+                        unRepairedSSTableWriter.append(row);
+                        unrepairedKeyCount++;
                     }
                 }
                 // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
@@ -1109,11 +1114,9 @@ public class CompactionManager implements CompactionManagerMBean
 
     private static class ValidationCompactionIterable extends CompactionIterable
     {
-        public ValidationCompactionIterable(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Range<Token> range, int gcBefore)
+        public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ICompactionScanner> scanners, int gcBefore)
         {
-            super(OperationType.VALIDATION,
-                  cfs.getCompactionStrategy().getScanners(sstables, range),
-                  new ValidationCompactionController(cfs, gcBefore));
+            super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index c1c5504..6217348 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -129,9 +130,6 @@ public class CompactionTask extends AbstractCompactionTask
 
         UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
 
-        CompactionController controller = getCompactionController(sstables);
-        Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
-
         // new sstables from flush can be added during a compaction, but only the compaction can remove them,
         // so in our single-threaded compaction world this is a valid way of determining if we're compacting
         // all the sstables (that existed when we started)
@@ -139,120 +137,117 @@ public class CompactionTask extends AbstractCompactionTask
 
         long start = System.nanoTime();
         long totalKeysWritten = 0;
-        long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
-        long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
-        long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
-        logger.debug("Expected bloom filter size : {}", keysPerSSTable);
-
-        // TODO: errors when creating the scanners can result in untidied resources
-        AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
-        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-
-        // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
-        // replace the old entries.  Track entries to preheat here until then.
-        long minRepairedAt = getMinRepairedAt(actuallyCompact);
-        // we only need the age of the data that we're actually retaining
-        long maxAge = getMaxDataAge(actuallyCompact);
-        if (collector != null)
-            collector.beginCompaction(ci);
-        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
-        try
+
+        try (CompactionController controller = getCompactionController(sstables);)
         {
-            if (!iter.hasNext())
-            {
-                // don't mark compacted in the finally block, since if there _is_ nondeleted data,
-                // we need to sync it (via closeAndOpen) first, so there is no period during which
-                // a crash could cause data loss.
-                cfs.markObsolete(sstables, compactionType);
-                return;
-            }
 
-            writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
-            while (iter.hasNext())
-            {
-                if (ci.isStopRequested())
-                    throw new CompactionInterruptedException(ci.getCompactionInfo());
+            Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
+
+            long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
+            long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
+            long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
+            logger.debug("Expected bloom filter size : {}", keysPerSSTable);
 
-                AbstractCompactedRow row = iter.next();
-                if (writer.append(row) != null)
+            try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
+            {
+                AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
+                Iterator<AbstractCompactedRow> iter = ci.iterator();
+
+                // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
+                // replace the old entries.  Track entries to preheat here until then.
+                long minRepairedAt = getMinRepairedAt(actuallyCompact);
+                // we only need the age of the data that we're actually retaining
+                long maxAge = getMaxDataAge(actuallyCompact);
+                if (collector != null)
+                    collector.beginCompaction(ci);
+                SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
+                try
                 {
-                    totalKeysWritten++;
-                    if (newSSTableSegmentThresholdReached(writer.currentWriter()))
+                    if (!iter.hasNext())
                     {
-                        writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+                        // don't mark compacted in the finally block, since if there _is_ nondeleted data,
+                        // we need to sync it (via closeAndOpen) first, so there is no period during which
+                        // a crash could cause data loss.
+                        cfs.markObsolete(sstables, compactionType);
+                        return;
                     }
+
+                    writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+                    while (iter.hasNext())
+                    {
+                        if (ci.isStopRequested())
+                            throw new CompactionInterruptedException(ci.getCompactionInfo());
+
+                        AbstractCompactedRow row = iter.next();
+                        if (writer.append(row) != null)
+                        {
+                            totalKeysWritten++;
+                            if (newSSTableSegmentThresholdReached(writer.currentWriter()))
+                            {
+                                writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+                            }
+                        }
+                    }
+
+                    // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+                    writer.finish(false);
                 }
-            }
+                catch (Throwable t)
+                {
+                    writer.abort();
+                    throw t;
+                }
+                finally
+                {
 
-            // don't replace old sstables yet, as we need to mark the compaction finished in the system table
-            writer.finish(false);
-        }
-        catch (Throwable t)
-        {
-            writer.abort();
-            throw t;
-        }
-        finally
-        {
-            controller.close();
+                    // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
+                    // (in replaceCompactedSSTables)
+                    if (taskId != null)
+                        SystemKeyspace.finishCompaction(taskId);
 
-            // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
-            // (in replaceCompactedSSTables)
-            if (taskId != null)
-                SystemKeyspace.finishCompaction(taskId);
+                    if (collector != null)
+                        collector.finishCompaction(ci);
+                }
 
-            if (collector != null)
-                collector.finishCompaction(ci);
+                Collection<SSTableReader> oldSStables = this.sstables;
+                List<SSTableReader> newSStables = writer.finished();
+                if (!offline)
+                    cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
+
+                // log a bunch of statistics about the result and save to system table compaction_history
+                long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                long startsize = SSTableReader.getTotalBytes(oldSStables);
+                long endsize = SSTableReader.getTotalBytes(newSStables);
+                double ratio = (double) endsize / (double) startsize;
+
+                StringBuilder newSSTableNames = new StringBuilder();
+                for (SSTableReader reader : newSStables)
+                    newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+
+                double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+                long totalSourceRows = 0;
+                long[] counts = ci.getMergedRowCounts();
+                StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+                Map<Integer, Long> mergedRows = new HashMap<>();
+                for (int i = 0; i < counts.length; i++)
+                {
+                    long count = counts[i];
+                    if (count == 0)
+                        continue;
+
+                    int rows = i + 1;
+                    totalSourceRows += rows * count;
+                    mergeSummary.append(String.format("%d:%d, ", rows, count));
+                    mergedRows.put(rows, count);
+                }
 
-            try
-            {
-                // We don't expect this to throw, but just in case, we do it after the cleanup above, to make sure
-                // we don't end up with compaction information hanging around indefinitely in limbo.
-                iter.close();
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
+                SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
+                logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
+                                          oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+                logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+                logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
             }
         }
-
-        Collection<SSTableReader> oldSStables = this.sstables;
-        List<SSTableReader> newSStables = writer.finished();
-        if (!offline)
-            cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
-
-        // log a bunch of statistics about the result and save to system table compaction_history
-        long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-        long startsize = SSTableReader.getTotalBytes(oldSStables);
-        long endsize = SSTableReader.getTotalBytes(newSStables);
-        double ratio = (double) endsize / (double) startsize;
-
-        StringBuilder newSSTableNames = new StringBuilder();
-        for (SSTableReader reader : newSStables)
-            newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
-
-        double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
-        long totalSourceRows = 0;
-        long[] counts = ci.getMergedRowCounts();
-        StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
-        Map<Integer, Long> mergedRows = new HashMap<>();
-        for (int i = 0; i < counts.length; i++)
-        {
-            long count = counts[i];
-            if (count == 0)
-                continue;
-
-            int rows = i + 1;
-            totalSourceRows += rows * count;
-            mergeSummary.append(String.format("%d:%d, ", rows, count));
-            mergedRows.put(rows, count);
-        }
-
-        SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
-        logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
-                                  oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
-        logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
-        logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
     }
 
     private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 3ee59ad..7f2d881 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -198,7 +198,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
         return maxSSTableSizeInMB * 1024L * 1024L;
     }
 
-    public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+    public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
     {
         Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
         for (SSTableReader sstable : sstables)
@@ -210,26 +210,41 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
         }
 
         List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size());
-        for (Integer level : byLevel.keySet())
+        try
         {
-            // level can be -1 when sstables are added to DataTracker but not to LeveledManifest
-            // since we don't know which level those sstable belong yet, we simply do the same as L0 sstables.
-            if (level <= 0)
+            for (Integer level : byLevel.keySet())
             {
-                // L0 makes no guarantees about overlapping-ness.  Just create a direct scanner for each
-                for (SSTableReader sstable : byLevel.get(level))
-                    scanners.add(sstable.getScanner(range, CompactionManager.instance.getRateLimiter()));
+                // level can be -1 when sstables are added to DataTracker but not to LeveledManifest
+                // since we don't know which level those sstable belong yet, we simply do the same as L0 sstables.
+                if (level <= 0)
+                {
+                    // L0 makes no guarantees about overlapping-ness.  Just create a direct scanner for each
+                    for (SSTableReader sstable : byLevel.get(level))
+                        scanners.add(sstable.getScanner(range, CompactionManager.instance.getRateLimiter()));
+                }
+                else
+                {
+                    // Create a LeveledScanner that only opens one sstable at a time, in sorted order
+                    List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range);
+                    if (!intersecting.isEmpty())
+                        scanners.add(new LeveledScanner(intersecting, range));
+                }
             }
-            else
+        }
+        catch (Throwable t)
+        {
+            try
+            {
+                new ScannerList(scanners).close();
+            }
+            catch (Throwable t2)
             {
-                // Create a LeveledScanner that only opens one sstable at a time, in sorted order
-                List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range);
-                if (!intersecting.isEmpty())
-                    scanners.add(new LeveledScanner(intersecting, range));
+                t.addSuppressed(t2);
             }
+            throw t;
         }
 
-        return scanners;
+        return new ScannerList(scanners);
     }
 
     // Lazily creates SSTableBoundedScanner for sstable that are assumed to be from the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 734fe23..f102fef 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -88,8 +88,9 @@ public class Upgrader
         outputHandler.output("Upgrading " + sstable);
 
         SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true);
-        try (CloseableIterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, strategy.getScanners(this.toUpgrade), controller).iterator())
+        try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(this.toUpgrade))
         {
+            Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller).iterator();
             writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
             while (iter.hasNext())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/utils/CloseableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CloseableIterator.java b/src/java/org/apache/cassandra/utils/CloseableIterator.java
index 399c6d1..7474f3d 100644
--- a/src/java/org/apache/cassandra/utils/CloseableIterator.java
+++ b/src/java/org/apache/cassandra/utils/CloseableIterator.java
@@ -21,6 +21,6 @@ import java.io.Closeable;
 import java.util.Iterator;
 
 // so we can instantiate anonymous classes implementing both interfaces
-public interface CloseableIterator<T> extends Iterator<T>, Closeable
+public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable, Closeable
 {
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index defb087..65c7b69 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -143,7 +143,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
 
         // get LeveledScanner for level 1 sstables
         Collection<SSTableReader> sstables = strategy.manifest.getLevel(1);
-        List<ICompactionScanner> scanners = strategy.getScanners(sstables);
+        List<ICompactionScanner> scanners = strategy.getScanners(sstables).scanners;
         assertEquals(1, scanners.size()); // should be one per level
         ICompactionScanner scanner = scanners.get(0);
         // scan through to the end


[3/3] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/compaction/CompactionManager.java
	src/java/org/apache/cassandra/db/compaction/CompactionTask.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0956a8a7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0956a8a7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0956a8a7

Branch: refs/heads/trunk
Commit: 0956a8a717781c8a748931f04a18a215d7d53869
Parents: 3e305f8 0e83100
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Sep 19 18:26:23 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Sep 19 18:26:23 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/DataTracker.java    |   5 +-
 .../compaction/AbstractCompactionStrategy.java  |  56 ++++-
 .../db/compaction/CompactionManager.java        | 199 +++++++++---------
 .../cassandra/db/compaction/CompactionTask.java | 204 +++++++++----------
 .../compaction/LeveledCompactionStrategy.java   |  43 ++--
 .../cassandra/db/compaction/Upgrader.java       |   3 +-
 .../cassandra/utils/CloseableIterator.java      |   2 +-
 .../LeveledCompactionStrategyTest.java          |   2 +-
 9 files changed, 289 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e00d990,f55e5d2..0af1681
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,29 -1,5 +1,30 @@@
 +3.0
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Support Java source code for user-defined functions (CASSANDRA-7562)
 + * Require arg types to disambiguate UDF drops (CASSANDRA-7812)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Verify that UDF class methods are static (CASSANDRA-7781)
 + * Support pure user-defined functions (CASSANDRA-7395, 7740)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 +
 +
  2.1.1
+  * Fix resource leak in event of corrupt sstable
   * (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131)
   * Provide visibility into prepared statements churn (CASSANDRA-7921, CASSANDRA-7930)
   * Invalidate prepared statements when their keyspace or table is

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 28ab84e,97696a8..6a0e0df
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@@ -19,8 -19,10 +19,9 @@@ package org.apache.cassandra.db.compact
  
  import java.util.*;
  
+ import com.google.common.base.Throwables;
  import com.google.common.collect.ImmutableMap;
  import com.google.common.base.Predicate;
 -import com.google.common.collect.ImmutableMap;
  import com.google.common.collect.Iterables;
  import com.google.common.util.concurrent.RateLimiter;
  import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 85b7e38,e309cfb..0f8acba
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1031,78 -990,63 +1040,74 @@@ public class CompactionManager implemen
              if (!new File(sstable.getFilename()).exists())
              {
                  logger.info("Skipping anticompaction for {}, required sstable was compacted and is no longer available.", sstable);
 +                i.remove();
                  continue;
              }
 +            if (groupMaxDataAge < sstable.maxDataAge)
 +                groupMaxDataAge = sstable.maxDataAge;
 +        }
  
 -            logger.info("Anticompacting {}", sstable);
 -            Set<SSTableReader> sstableAsSet = new HashSet<>();
 -            sstableAsSet.add(sstable);
 +        if (anticompactionGroup.size() == 0)
 +        {
 +            logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available");
 +            return 0;
 +        }
  
 -            File destination = cfs.directories.getDirectoryForNewSSTables();
 -            SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
 -            SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
 +        logger.info("Anticompacting {}", anticompactionGroup);
 +        Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup);
  
 -            AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
 -            try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
 -                 CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
 -            {
 -                repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
 -                unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
 +        File destination = cfs.directories.getDirectoryForNewSSTables();
 +        SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false);
 +        SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false);
  
-         AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-         List<ICompactionScanner> scanners = strategy.getScanners(anticompactionGroup);
- 
-         int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
- 
 -                CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
 -                Iterator<AbstractCompactedRow> iter = ci.iterator();
 -                while(iter.hasNext())
 +        long repairedKeyCount = 0;
 +        long unrepairedKeyCount = 0;
-         try (CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
++        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
++        try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup);
++             CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
 +        {
++            int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
++
 +            repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
 +            unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
 +
-             CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
- 
-             try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
++            CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
++            Iterator<AbstractCompactedRow> iter = ci.iterator();
++            while(iter.hasNext())
 +            {
-                 while(iter.hasNext())
++                AbstractCompactedRow row = iter.next();
++                // if current range from sstable is repaired, save it into the new repaired sstable
++                if (Range.isInRanges(row.key.getToken(), ranges))
                  {
--                    AbstractCompactedRow row = iter.next();
--                    // if current range from sstable is repaired, save it into the new repaired sstable
--                    if (Range.isInRanges(row.key.getToken(), ranges))
--                    {
--                        repairedSSTableWriter.append(row);
--                        repairedKeyCount++;
--                    }
--                    // otherwise save into the new 'non-repaired' table
--                    else
--                    {
--                        unRepairedSSTableWriter.append(row);
--                        unrepairedKeyCount++;
--                    }
++                    repairedSSTableWriter.append(row);
++                    repairedKeyCount++;
++                }
++                // otherwise save into the new 'non-repaired' table
++                else
++                {
++                    unRepairedSSTableWriter.append(row);
++                    unrepairedKeyCount++;
                  }
 -                // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
 -                // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
 -                repairedSSTableWriter.finish(false, repairedAt);
 -                unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
 -                // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
 -                anticompactedSSTables.addAll(repairedSSTableWriter.finished());
 -                anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
 -            }
 -            catch (Throwable e)
 -            {
 -                logger.error("Error anticompacting " + sstable, e);
 -                repairedSSTableWriter.abort();
 -                unRepairedSSTableWriter.abort();
              }
 +            // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
 +            // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
 +            repairedSSTableWriter.finish(false, repairedAt);
 +            unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
 +            // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
 +            logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount,
 +                                                                       repairedKeyCount + unrepairedKeyCount,
 +                                                                       cfs.keyspace.getName(),
 +                                                                       cfs.getColumnFamilyName(),
 +                                                                       anticompactionGroup);
 +            return repairedSSTableWriter.finished().size() + unRepairedSSTableWriter.finished().size();
          }
 -        String format = "Repaired {} keys of {} for {}/{}";
 -        logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName());
 -        String format2 = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
 -        logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size());
 -
 -        return anticompactedSSTables;
 +        catch (Throwable e)
 +        {
 +            logger.error("Error anticompacting " + anticompactionGroup, e);
 +            repairedSSTableWriter.abort();
 +            unRepairedSSTableWriter.abort();
 +        }
 +        return 0;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 4a22d0c,6217348..527f483
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -20,7 -20,9 +20,8 @@@ package org.apache.cassandra.db.compact
  import java.io.File;
  import java.io.IOException;
  import java.util.Collection;
 -import java.util.Collections;
  import java.util.HashMap;
+ import java.util.Iterator;
  import java.util.List;
  import java.util.Map;
  import java.util.Set;
@@@ -43,8 -47,7 +44,7 @@@ import org.apache.cassandra.io.sstable.
  import org.apache.cassandra.io.sstable.SSTableWriter;
  import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
  import org.apache.cassandra.service.ActiveRepairService;
--import org.apache.cassandra.utils.CloseableIterator;
 +import org.apache.cassandra.utils.UUIDGen;
  
  public class CompactionTask extends AbstractCompactionTask
  {
@@@ -144,120 -137,117 +141,117 @@@
  
          long start = System.nanoTime();
          long totalKeysWritten = 0;
-         long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
-         long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
-         long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
-         logger.debug("Expected bloom filter size : {}", keysPerSSTable);
- 
-         // TODO: errors when creating the scanners can result in untidied resources
-         AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
-         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
- 
-         // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
-         // replace the old entries.  Track entries to preheat here until then.
-         long minRepairedAt = getMinRepairedAt(actuallyCompact);
-         // we only need the age of the data that we're actually retaining
-         long maxAge = getMaxDataAge(actuallyCompact);
-         if (collector != null)
-             collector.beginCompaction(ci);
-         SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
-         try
+ 
+         try (CompactionController controller = getCompactionController(sstables);)
          {
-             if (!iter.hasNext())
-             {
-                 // don't mark compacted in the finally block, since if there _is_ nondeleted data,
-                 // we need to sync it (via closeAndOpen) first, so there is no period during which
-                 // a crash could cause data loss.
-                 cfs.markObsolete(sstables, compactionType);
-                 return;
-             }
  
-             writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
-             while (iter.hasNext())
-             {
-                 if (ci.isStopRequested())
-                     throw new CompactionInterruptedException(ci.getCompactionInfo());
+             Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
+ 
+             long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
+             long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
+             long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
+             logger.debug("Expected bloom filter size : {}", keysPerSSTable);
  
-                 AbstractCompactedRow row = iter.next();
-                 if (writer.append(row) != null)
+             try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
+             {
+                 AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
+                 Iterator<AbstractCompactedRow> iter = ci.iterator();
+ 
+                 // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
+                 // replace the old entries.  Track entries to preheat here until then.
+                 long minRepairedAt = getMinRepairedAt(actuallyCompact);
+                 // we only need the age of the data that we're actually retaining
+                 long maxAge = getMaxDataAge(actuallyCompact);
+                 if (collector != null)
+                     collector.beginCompaction(ci);
+                 SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
+                 try
                  {
-                     totalKeysWritten++;
-                     if (newSSTableSegmentThresholdReached(writer.currentWriter()))
+                     if (!iter.hasNext())
                      {
-                         writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+                         // don't mark compacted in the finally block, since if there _is_ nondeleted data,
+                         // we need to sync it (via closeAndOpen) first, so there is no period during which
+                         // a crash could cause data loss.
+                         cfs.markObsolete(sstables, compactionType);
+                         return;
                      }
+ 
+                     writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+                     while (iter.hasNext())
+                     {
+                         if (ci.isStopRequested())
+                             throw new CompactionInterruptedException(ci.getCompactionInfo());
+ 
+                         AbstractCompactedRow row = iter.next();
+                         if (writer.append(row) != null)
+                         {
+                             totalKeysWritten++;
+                             if (newSSTableSegmentThresholdReached(writer.currentWriter()))
+                             {
+                                 writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+                             }
+                         }
+                     }
+ 
+                     // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+                     writer.finish(false);
                  }
-             }
+                 catch (Throwable t)
+                 {
+                     writer.abort();
+                     throw t;
+                 }
+                 finally
+                 {
  
-             // don't replace old sstables yet, as we need to mark the compaction finished in the system table
-             writer.finish(false);
-         }
-         catch (Throwable t)
-         {
-             writer.abort();
-             throw t;
-         }
-         finally
-         {
-             controller.close();
+                     // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
+                     // (in replaceCompactedSSTables)
+                     if (taskId != null)
+                         SystemKeyspace.finishCompaction(taskId);
  
-             // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
-             // (in replaceCompactedSSTables)
-             if (taskId != null)
-                 SystemKeyspace.finishCompaction(taskId);
+                     if (collector != null)
+                         collector.finishCompaction(ci);
+                 }
  
-             if (collector != null)
-                 collector.finishCompaction(ci);
+                 Collection<SSTableReader> oldSStables = this.sstables;
+                 List<SSTableReader> newSStables = writer.finished();
+                 if (!offline)
+                     cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
+ 
+                 // log a bunch of statistics about the result and save to system table compaction_history
+                 long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                 long startsize = SSTableReader.getTotalBytes(oldSStables);
+                 long endsize = SSTableReader.getTotalBytes(newSStables);
+                 double ratio = (double) endsize / (double) startsize;
+ 
+                 StringBuilder newSSTableNames = new StringBuilder();
+                 for (SSTableReader reader : newSStables)
+                     newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+ 
+                 double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+                 long totalSourceRows = 0;
+                 long[] counts = ci.getMergedRowCounts();
+                 StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+                 Map<Integer, Long> mergedRows = new HashMap<>();
+                 for (int i = 0; i < counts.length; i++)
+                 {
+                     long count = counts[i];
+                     if (count == 0)
+                         continue;
+ 
+                     int rows = i + 1;
+                     totalSourceRows += rows * count;
+                     mergeSummary.append(String.format("%d:%d, ", rows, count));
+                     mergedRows.put(rows, count);
+                 }
  
-             try
-             {
-                 // We don't expect this to throw, but just in case, we do it after the cleanup above, to make sure
-                 // we don't end up with compaction information hanging around indefinitely in limbo.
-                 iter.close();
-             }
-             catch (IOException e)
-             {
-                 throw new RuntimeException(e);
+                 SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
 -                logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
 -                                          oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
++                logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
++                                          taskIdLoggerMsg, oldSStables.size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+                 logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+                 logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
              }
          }
- 
-         Collection<SSTableReader> oldSStables = this.sstables;
-         List<SSTableReader> newSStables = writer.finished();
-         if (!offline)
-             cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
- 
-         // log a bunch of statistics about the result and save to system table compaction_history
-         long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-         long startsize = SSTableReader.getTotalBytes(oldSStables);
-         long endsize = SSTableReader.getTotalBytes(newSStables);
-         double ratio = (double) endsize / (double) startsize;
- 
-         StringBuilder newSSTableNames = new StringBuilder();
-         for (SSTableReader reader : newSStables)
-             newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
- 
-         double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
-         long totalSourceRows = 0;
-         long[] counts = ci.getMergedRowCounts();
-         StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
-         Map<Integer, Long> mergedRows = new HashMap<>();
-         for (int i = 0; i < counts.length; i++)
-         {
-             long count = counts[i];
-             if (count == 0)
-                 continue;
- 
-             int rows = i + 1;
-             totalSourceRows += rows * count;
-             mergeSummary.append(String.format("%d:%d, ", rows, count));
-             mergedRows.put(rows, count);
-         }
- 
-         SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
-         logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
-                                   taskIdLoggerMsg, oldSStables.size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
-         logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
-         logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
      }
  
      private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------


[2/3] git commit: Fix resource leak in event of corrupt sstable

Posted by be...@apache.org.
Fix resource leak in event of corrupt sstable

patch by benedict; review by yukim for CASSANDRA-7932


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e831007
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e831007
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e831007

Branch: refs/heads/trunk
Commit: 0e831007760bffced8687f51b99525b650d7e193
Parents: c5c0585
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Sep 19 18:17:19 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Sep 19 18:17:19 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/DataTracker.java    |   5 +-
 .../compaction/AbstractCompactionStrategy.java  |  56 ++++-
 .../db/compaction/CompactionManager.java        | 193 +++++++++---------
 .../cassandra/db/compaction/CompactionTask.java | 203 +++++++++----------
 .../compaction/LeveledCompactionStrategy.java   |  43 ++--
 .../cassandra/db/compaction/Upgrader.java       |   3 +-
 .../cassandra/utils/CloseableIterator.java      |   2 +-
 .../LeveledCompactionStrategyTest.java          |   2 +-
 9 files changed, 286 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d3ee7d9..f55e5d2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.1
+ * Fix resource leak in event of corrupt sstable
  * (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131)
  * Provide visibility into prepared statements churn (CASSANDRA-7921, CASSANDRA-7930)
  * Invalidate prepared statements when their keyspace or table is

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 857e8bd..24ea9dd 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -320,7 +320,7 @@ public class DataTracker
     void removeUnreadableSSTables(File directory)
     {
         View currentView, newView;
-        List<SSTableReader> remaining = new ArrayList<>();
+        Set<SSTableReader> remaining = new HashSet<>();
         do
         {
             currentView = view.get();
@@ -334,6 +334,9 @@ public class DataTracker
             newView = currentView.replace(currentView.sstables, remaining);
         }
         while (!view.compareAndSet(currentView, newView));
+        for (SSTableReader sstable : currentView.sstables)
+            if (!remaining.contains(sstable))
+                sstable.releaseReference();
         notifySSTablesChanged(remaining, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 1bbc93d..97696a8 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.*;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
@@ -264,16 +265,61 @@ public abstract class AbstractCompactionStrategy
      * allow for a more memory efficient solution if we know the sstable don't overlap (see
      * LeveledCompactionStrategy for instance).
      */
-    public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+    public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
     {
         RateLimiter limiter = CompactionManager.instance.getRateLimiter();
         ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>();
-        for (SSTableReader sstable : sstables)
-            scanners.add(sstable.getScanner(range, limiter));
-        return scanners;
+        try
+        {
+            for (SSTableReader sstable : sstables)
+                scanners.add(sstable.getScanner(range, limiter));
+        }
+        catch (Throwable t)
+        {
+            try
+            {
+                new ScannerList(scanners).close();
+            }
+            catch (Throwable t2)
+            {
+                t.addSuppressed(t2);
+            }
+            throw t;
+        }
+        return new ScannerList(scanners);
+    }
+
+    public static class ScannerList implements AutoCloseable
+    {
+        public final List<ICompactionScanner> scanners;
+        public ScannerList(List<ICompactionScanner> scanners)
+        {
+            this.scanners = scanners;
+        }
+
+        public void close()
+        {
+            Throwable t = null;
+            for (ICompactionScanner scanner : scanners)
+            {
+                try
+                {
+                    scanner.close();
+                }
+                catch (Throwable t2)
+                {
+                    if (t == null)
+                        t = t2;
+                    else
+                        t.addSuppressed(t2);
+                }
+            }
+            if (t != null)
+                throw Throwables.propagate(t);
+        }
     }
 
-    public List<ICompactionScanner> getScanners(Collection<SSTableReader> toCompact)
+    public ScannerList getScanners(Collection<SSTableReader> toCompact)
     {
         return getScanners(toCompact, null);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 296fe45..e309cfb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -870,89 +870,98 @@ public class CompactionManager implements CompactionManagerMBean
         if (!cfs.isValid())
             return;
 
-        Collection<SSTableReader> sstables;
-        String snapshotName = validator.desc.sessionId.toString();
-        int gcBefore;
-        boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
-        if (isSnapshotValidation)
-        {
-            // If there is a snapshot created for the session then read from there.
-            sstables = cfs.getSnapshotSSTableReader(snapshotName);
-
-            // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
-            // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
-            // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
-            // 'as good as in the non-snapshot' case)
-            gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
-        }
-        else
+        Collection<SSTableReader> sstables = null;
+        try
         {
-            // flush first so everyone is validating data that is as similar as possible
-            StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
-            // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
-            // instead so they won't be cleaned up if they do get compacted during the validation
-            if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null)
-                sstables = cfs.markCurrentSSTablesReferenced();
-            else
-                sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
 
-            if (validator.gcBefore > 0)
-                gcBefore = validator.gcBefore;
+            String snapshotName = validator.desc.sessionId.toString();
+            int gcBefore;
+            boolean isSnapshotValidation = cfs.snapshotExists(snapshotName);
+            if (isSnapshotValidation)
+            {
+                // If there is a snapshot created for the session then read from there.
+                sstables = cfs.getSnapshotSSTableReader(snapshotName);
+
+                // Computing gcbefore based on the current time wouldn't be very good because we know each replica will execute
+                // this at a different time (that's the whole purpose of repair with snaphsot). So instead we take the creation
+                // time of the snapshot, which should give us roughtly the same time on each replica (roughtly being in that case
+                // 'as good as in the non-snapshot' case)
+                gcBefore = cfs.gcBefore(cfs.getSnapshotCreationTime(snapshotName));
+            }
             else
-                gcBefore = getDefaultGcBefore(cfs);
-        }
-
-        // Create Merkle tree suitable to hold estimated partitions for given range.
-        // We blindly assume that partition is evenly distributed on all sstables for now.
-        long numPartitions = 0;
-        for (SSTableReader sstable : sstables)
-        {
-            numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
-        }
-        // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
-        int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
-        MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+            {
+                // flush first so everyone is validating data that is as similar as possible
+                StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
+                // we don't mark validating sstables as compacting in DataTracker, so we have to mark them referenced
+                // instead so they won't be cleaned up if they do get compacted during the validation
+                if (validator.desc.parentSessionId == null || ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId) == null)
+                    sstables = cfs.markCurrentSSTablesReferenced();
+                else
+                    sstables = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).getAndReferenceSSTables(cfs.metadata.cfId);
 
-        CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.desc.range, gcBefore);
-        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
+                if (validator.gcBefore > 0)
+                    gcBefore = validator.gcBefore;
+                else
+                    gcBefore = getDefaultGcBefore(cfs);
+            }
 
-        long start = System.nanoTime();
-        metrics.beginCompaction(ci);
-        try
-        {
-            // validate the CF as we iterate over it
-            validator.prepare(cfs, tree);
-            while (iter.hasNext())
+            // Create Merkle tree suitable to hold estimated partitions for given range.
+            // We blindly assume that partition is evenly distributed on all sstables for now.
+            long numPartitions = 0;
+            for (SSTableReader sstable : sstables)
             {
-                if (ci.isStopRequested())
-                    throw new CompactionInterruptedException(ci.getCompactionInfo());
-                AbstractCompactedRow row = iter.next();
-                validator.add(row);
+                numPartitions += sstable.estimatedKeysForRanges(Collections.singleton(validator.desc.range));
             }
-            validator.complete();
-        }
-        finally
-        {
-            iter.close();
-            SSTableReader.releaseReferences(sstables);
-            if (isSnapshotValidation)
+            // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
+            int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
+            MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+
+            long start = System.nanoTime();
+            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables, validator.desc.range))
             {
-                cfs.clearSnapshot(snapshotName);
+                CompactionIterable ci = new ValidationCompactionIterable(cfs, scanners.scanners, gcBefore);
+                Iterator<AbstractCompactedRow> iter = ci.iterator();
+                metrics.beginCompaction(ci);
+                try
+                {
+                    // validate the CF as we iterate over it
+                    validator.prepare(cfs, tree);
+                    while (iter.hasNext())
+                    {
+                        if (ci.isStopRequested())
+                            throw new CompactionInterruptedException(ci.getCompactionInfo());
+                        AbstractCompactedRow row = iter.next();
+                        validator.add(row);
+                    }
+                    validator.complete();
+                }
+                finally
+                {
+                    if (isSnapshotValidation)
+                    {
+                        cfs.clearSnapshot(snapshotName);
+                    }
+
+                    metrics.finishCompaction(ci);
+                }
             }
 
-            metrics.finishCompaction(ci);
+            if (logger.isDebugEnabled())
+            {
+                // MT serialize may take time
+                long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
+                             duration,
+                             depth,
+                             numPartitions,
+                             MerkleTree.serializer.serializedSize(tree, 0),
+                             validator.desc);
+            }
         }
-
-        if (logger.isDebugEnabled())
+        finally
         {
-            // MT serialize may take time
-            long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-            logger.debug("Validation finished in {} msec, depth {} for {} keys, serialized size {} bytes for {}",
-                         duration,
-                         depth,
-                         numPartitions,
-                         MerkleTree.serializer.serializedSize(tree, 0),
-                         validator.desc);
+            if (sstables != null)
+                SSTableReader.releaseReferences(sstables);
         }
     }
 
@@ -993,32 +1002,28 @@ public class CompactionManager implements CompactionManagerMBean
             SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
 
             AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-            List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
-
-            try (CompactionController controller = new CompactionController(cfs, new HashSet<>(Collections.singleton(sstable)), CFMetaData.DEFAULT_GC_GRACE_SECONDS))
+            try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
+                 CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
             {
                 repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
                 unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
 
-                CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
-
-                try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
+                CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
+                Iterator<AbstractCompactedRow> iter = ci.iterator();
+                while(iter.hasNext())
                 {
-                    while(iter.hasNext())
+                    AbstractCompactedRow row = iter.next();
+                    // if current range from sstable is repaired, save it into the new repaired sstable
+                    if (Range.isInRanges(row.key.getToken(), ranges))
                     {
-                        AbstractCompactedRow row = iter.next();
-                        // if current range from sstable is repaired, save it into the new repaired sstable
-                        if (Range.isInRanges(row.key.getToken(), ranges))
-                        {
-                            repairedSSTableWriter.append(row);
-                            repairedKeyCount++;
-                        }
-                        // otherwise save into the new 'non-repaired' table
-                        else
-                        {
-                            unRepairedSSTableWriter.append(row);
-                            unrepairedKeyCount++;
-                        }
+                        repairedSSTableWriter.append(row);
+                        repairedKeyCount++;
+                    }
+                    // otherwise save into the new 'non-repaired' table
+                    else
+                    {
+                        unRepairedSSTableWriter.append(row);
+                        unrepairedKeyCount++;
                     }
                 }
                 // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
@@ -1109,11 +1114,9 @@ public class CompactionManager implements CompactionManagerMBean
 
     private static class ValidationCompactionIterable extends CompactionIterable
     {
-        public ValidationCompactionIterable(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, Range<Token> range, int gcBefore)
+        public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ICompactionScanner> scanners, int gcBefore)
         {
-            super(OperationType.VALIDATION,
-                  cfs.getCompactionStrategy().getScanners(sstables, range),
-                  new ValidationCompactionController(cfs, gcBefore));
+            super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index c1c5504..6217348 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -129,9 +130,6 @@ public class CompactionTask extends AbstractCompactionTask
 
         UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
 
-        CompactionController controller = getCompactionController(sstables);
-        Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
-
         // new sstables from flush can be added during a compaction, but only the compaction can remove them,
         // so in our single-threaded compaction world this is a valid way of determining if we're compacting
         // all the sstables (that existed when we started)
@@ -139,120 +137,117 @@ public class CompactionTask extends AbstractCompactionTask
 
         long start = System.nanoTime();
         long totalKeysWritten = 0;
-        long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
-        long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
-        long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
-        logger.debug("Expected bloom filter size : {}", keysPerSSTable);
-
-        // TODO: errors when creating the scanners can result in untidied resources
-        AbstractCompactionIterable ci = new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller);
-        CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-
-        // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
-        // replace the old entries.  Track entries to preheat here until then.
-        long minRepairedAt = getMinRepairedAt(actuallyCompact);
-        // we only need the age of the data that we're actually retaining
-        long maxAge = getMaxDataAge(actuallyCompact);
-        if (collector != null)
-            collector.beginCompaction(ci);
-        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
-        try
+
+        try (CompactionController controller = getCompactionController(sstables);)
         {
-            if (!iter.hasNext())
-            {
-                // don't mark compacted in the finally block, since if there _is_ nondeleted data,
-                // we need to sync it (via closeAndOpen) first, so there is no period during which
-                // a crash could cause data loss.
-                cfs.markObsolete(sstables, compactionType);
-                return;
-            }
 
-            writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
-            while (iter.hasNext())
-            {
-                if (ci.isStopRequested())
-                    throw new CompactionInterruptedException(ci.getCompactionInfo());
+            Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables());
+
+            long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact));
+            long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
+            long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables);
+            logger.debug("Expected bloom filter size : {}", keysPerSSTable);
 
-                AbstractCompactedRow row = iter.next();
-                if (writer.append(row) != null)
+            try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
+            {
+                AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller);
+                Iterator<AbstractCompactedRow> iter = ci.iterator();
+
+                // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to
+                // replace the old entries.  Track entries to preheat here until then.
+                long minRepairedAt = getMinRepairedAt(actuallyCompact);
+                // we only need the age of the data that we're actually retaining
+                long maxAge = getMaxDataAge(actuallyCompact);
+                if (collector != null)
+                    collector.beginCompaction(ci);
+                SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, compactionType, offline);
+                try
                 {
-                    totalKeysWritten++;
-                    if (newSSTableSegmentThresholdReached(writer.currentWriter()))
+                    if (!iter.hasNext())
                     {
-                        writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+                        // don't mark compacted in the finally block, since if there _is_ nondeleted data,
+                        // we need to sync it (via closeAndOpen) first, so there is no period during which
+                        // a crash could cause data loss.
+                        cfs.markObsolete(sstables, compactionType);
+                        return;
                     }
+
+                    writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+                    while (iter.hasNext())
+                    {
+                        if (ci.isStopRequested())
+                            throw new CompactionInterruptedException(ci.getCompactionInfo());
+
+                        AbstractCompactedRow row = iter.next();
+                        if (writer.append(row) != null)
+                        {
+                            totalKeysWritten++;
+                            if (newSSTableSegmentThresholdReached(writer.currentWriter()))
+                            {
+                                writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt));
+                            }
+                        }
+                    }
+
+                    // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+                    writer.finish(false);
                 }
-            }
+                catch (Throwable t)
+                {
+                    writer.abort();
+                    throw t;
+                }
+                finally
+                {
 
-            // don't replace old sstables yet, as we need to mark the compaction finished in the system table
-            writer.finish(false);
-        }
-        catch (Throwable t)
-        {
-            writer.abort();
-            throw t;
-        }
-        finally
-        {
-            controller.close();
+                    // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
+                    // (in replaceCompactedSSTables)
+                    if (taskId != null)
+                        SystemKeyspace.finishCompaction(taskId);
 
-            // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
-            // (in replaceCompactedSSTables)
-            if (taskId != null)
-                SystemKeyspace.finishCompaction(taskId);
+                    if (collector != null)
+                        collector.finishCompaction(ci);
+                }
 
-            if (collector != null)
-                collector.finishCompaction(ci);
+                Collection<SSTableReader> oldSStables = this.sstables;
+                List<SSTableReader> newSStables = writer.finished();
+                if (!offline)
+                    cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
+
+                // log a bunch of statistics about the result and save to system table compaction_history
+                long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                long startsize = SSTableReader.getTotalBytes(oldSStables);
+                long endsize = SSTableReader.getTotalBytes(newSStables);
+                double ratio = (double) endsize / (double) startsize;
+
+                StringBuilder newSSTableNames = new StringBuilder();
+                for (SSTableReader reader : newSStables)
+                    newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+
+                double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
+                long totalSourceRows = 0;
+                long[] counts = ci.getMergedRowCounts();
+                StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
+                Map<Integer, Long> mergedRows = new HashMap<>();
+                for (int i = 0; i < counts.length; i++)
+                {
+                    long count = counts[i];
+                    if (count == 0)
+                        continue;
+
+                    int rows = i + 1;
+                    totalSourceRows += rows * count;
+                    mergeSummary.append(String.format("%d:%d, ", rows, count));
+                    mergedRows.put(rows, count);
+                }
 
-            try
-            {
-                // We don't expect this to throw, but just in case, we do it after the cleanup above, to make sure
-                // we don't end up with compaction information hanging around indefinitely in limbo.
-                iter.close();
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
+                SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
+                logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
+                                          oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+                logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
+                logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
             }
         }
-
-        Collection<SSTableReader> oldSStables = this.sstables;
-        List<SSTableReader> newSStables = writer.finished();
-        if (!offline)
-            cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType);
-
-        // log a bunch of statistics about the result and save to system table compaction_history
-        long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-        long startsize = SSTableReader.getTotalBytes(oldSStables);
-        long endsize = SSTableReader.getTotalBytes(newSStables);
-        double ratio = (double) endsize / (double) startsize;
-
-        StringBuilder newSSTableNames = new StringBuilder();
-        for (SSTableReader reader : newSStables)
-            newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
-
-        double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0;
-        long totalSourceRows = 0;
-        long[] counts = ci.getMergedRowCounts();
-        StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
-        Map<Integer, Long> mergedRows = new HashMap<>();
-        for (int i = 0; i < counts.length; i++)
-        {
-            long count = counts[i];
-            if (count == 0)
-                continue;
-
-            int rows = i + 1;
-            totalSourceRows += rows * count;
-            mergeSummary.append(String.format("%d:%d, ", rows, count));
-            mergedRows.put(rows, count);
-        }
-
-        SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
-        logger.info(String.format("Compacted %d sstables to [%s].  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
-                                  oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
-        logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
-        logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten));
     }
 
     private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 3ee59ad..7f2d881 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -198,7 +198,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
         return maxSSTableSizeInMB * 1024L * 1024L;
     }
 
-    public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+    public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
     {
         Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
         for (SSTableReader sstable : sstables)
@@ -210,26 +210,41 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
         }
 
         List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size());
-        for (Integer level : byLevel.keySet())
+        try
         {
-            // level can be -1 when sstables are added to DataTracker but not to LeveledManifest
-            // since we don't know which level those sstable belong yet, we simply do the same as L0 sstables.
-            if (level <= 0)
+            for (Integer level : byLevel.keySet())
             {
-                // L0 makes no guarantees about overlapping-ness.  Just create a direct scanner for each
-                for (SSTableReader sstable : byLevel.get(level))
-                    scanners.add(sstable.getScanner(range, CompactionManager.instance.getRateLimiter()));
+                // level can be -1 when sstables are added to DataTracker but not to LeveledManifest
+                // since we don't know which level those sstable belong yet, we simply do the same as L0 sstables.
+                if (level <= 0)
+                {
+                    // L0 makes no guarantees about overlapping-ness.  Just create a direct scanner for each
+                    for (SSTableReader sstable : byLevel.get(level))
+                        scanners.add(sstable.getScanner(range, CompactionManager.instance.getRateLimiter()));
+                }
+                else
+                {
+                    // Create a LeveledScanner that only opens one sstable at a time, in sorted order
+                    List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range);
+                    if (!intersecting.isEmpty())
+                        scanners.add(new LeveledScanner(intersecting, range));
+                }
             }
-            else
+        }
+        catch (Throwable t)
+        {
+            try
+            {
+                new ScannerList(scanners).close();
+            }
+            catch (Throwable t2)
             {
-                // Create a LeveledScanner that only opens one sstable at a time, in sorted order
-                List<SSTableReader> intersecting = LeveledScanner.intersecting(byLevel.get(level), range);
-                if (!intersecting.isEmpty())
-                    scanners.add(new LeveledScanner(intersecting, range));
+                t.addSuppressed(t2);
             }
+            throw t;
         }
 
-        return scanners;
+        return new ScannerList(scanners);
     }
 
     // Lazily creates SSTableBoundedScanner for sstable that are assumed to be from the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 734fe23..f102fef 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -88,8 +88,9 @@ public class Upgrader
         outputHandler.output("Upgrading " + sstable);
 
         SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(this.toUpgrade), OperationType.UPGRADE_SSTABLES, true);
-        try (CloseableIterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, strategy.getScanners(this.toUpgrade), controller).iterator())
+        try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(this.toUpgrade))
         {
+            Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller).iterator();
             writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
             while (iter.hasNext())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/src/java/org/apache/cassandra/utils/CloseableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CloseableIterator.java b/src/java/org/apache/cassandra/utils/CloseableIterator.java
index 399c6d1..7474f3d 100644
--- a/src/java/org/apache/cassandra/utils/CloseableIterator.java
+++ b/src/java/org/apache/cassandra/utils/CloseableIterator.java
@@ -21,6 +21,6 @@ import java.io.Closeable;
 import java.util.Iterator;
 
 // so we can instantiate anonymous classes implementing both interfaces
-public interface CloseableIterator<T> extends Iterator<T>, Closeable
+public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable, Closeable
 {
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e831007/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index defb087..65c7b69 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -143,7 +143,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
 
         // get LeveledScanner for level 1 sstables
         Collection<SSTableReader> sstables = strategy.manifest.getLevel(1);
-        List<ICompactionScanner> scanners = strategy.getScanners(sstables);
+        List<ICompactionScanner> scanners = strategy.getScanners(sstables).scanners;
         assertEquals(1, scanners.size()); // should be one per level
         ICompactionScanner scanner = scanners.get(0);
         // scan through to the end