You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2014/12/16 22:11:39 UTC

[1/5] cassandra git commit: Fix ref counting race between SSTableScanner and SSTR

Repository: cassandra
Updated Branches:
  refs/heads/trunk e73fccdcd -> 32ac6af2b


Fix ref counting race between SSTableScanner and SSTR

Patch by jmckenzie; reviewed by marcuse for CASSANDRA-8399


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1fec4a42
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1fec4a42
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1fec4a42

Branch: refs/heads/trunk
Commit: 1fec4a4281be94f8ef2f9f8a5eaccee56d70e87e
Parents: 9871914
Author: Joshua McKenzie <jm...@apache.org>
Authored: Tue Dec 16 14:37:07 2014 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Tue Dec 16 14:37:07 2014 -0600

----------------------------------------------------------------------
 .../compaction/AbstractCompactionIterable.java  |  7 +-
 .../compaction/AbstractCompactionStrategy.java  | 10 +--
 .../db/compaction/CompactionIterable.java       |  5 +-
 .../db/compaction/CompactionManager.java        | 22 +++---
 .../db/compaction/ICompactionScanner.java       | 34 ---------
 .../compaction/LeveledCompactionStrategy.java   |  7 +-
 .../compaction/WrappingCompactionStrategy.java  |  4 +-
 .../cassandra/io/sstable/SSTableReader.java     | 61 +++-------------
 .../cassandra/io/sstable/SSTableScanner.java    | 73 +++++++++++++++++---
 .../apache/cassandra/tools/SSTableExport.java   |  2 +-
 .../db/compaction/AntiCompactionTest.java       |  9 +--
 .../db/compaction/CompactionsTest.java          |  7 +-
 .../LeveledCompactionStrategyTest.java          |  5 +-
 .../cassandra/db/compaction/TTLExpiryTest.java  |  4 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |  3 +-
 .../io/sstable/SSTableRewriterTest.java         | 24 +++----
 .../io/sstable/SSTableScannerTest.java          | 17 +++--
 .../cassandra/io/sstable/SSTableUtils.java      |  4 +-
 18 files changed, 130 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
index e9f063f..5ac2c8b 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.utils.CloseableIterator;
 
 public abstract class AbstractCompactionIterable extends CompactionInfo.Holder implements Iterable<AbstractCompactedRow>
@@ -28,7 +29,7 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
     protected final CompactionController controller;
     protected final long totalBytes;
     protected volatile long bytesRead;
-    protected final List<ICompactionScanner> scanners;
+    protected final List<ISSTableScanner> scanners;
     /*
      * counters for merged rows.
      * array index represents (number of merged rows - 1), so index 0 is counter for no merge (1 row),
@@ -36,7 +37,7 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
      */
     protected final AtomicLong[] mergeCounters;
 
-    public AbstractCompactionIterable(CompactionController controller, OperationType type, List<ICompactionScanner> scanners)
+    public AbstractCompactionIterable(CompactionController controller, OperationType type, List<ISSTableScanner> scanners)
     {
         this.controller = controller;
         this.type = type;
@@ -44,7 +45,7 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder i
         this.bytesRead = 0;
 
         long bytes = 0;
-        for (ICompactionScanner scanner : scanners)
+        for (ISSTableScanner scanner : scanners)
             bytes += scanner.getLengthInBytes();
         this.totalBytes = bytes;
         mergeCounters = new AtomicLong[scanners.size()];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index bf136b9..337657d 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -22,7 +22,6 @@ import java.util.*;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
@@ -34,6 +33,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
@@ -269,7 +269,7 @@ public abstract class AbstractCompactionStrategy
     public ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
     {
         RateLimiter limiter = CompactionManager.instance.getRateLimiter();
-        ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>();
+        ArrayList<ISSTableScanner> scanners = new ArrayList<ISSTableScanner>();
         try
         {
             for (SSTableReader sstable : sstables)
@@ -306,8 +306,8 @@ public abstract class AbstractCompactionStrategy
 
     public static class ScannerList implements AutoCloseable
     {
-        public final List<ICompactionScanner> scanners;
-        public ScannerList(List<ICompactionScanner> scanners)
+        public final List<ISSTableScanner> scanners;
+        public ScannerList(List<ISSTableScanner> scanners)
         {
             this.scanners = scanners;
         }
@@ -315,7 +315,7 @@ public abstract class AbstractCompactionStrategy
         public void close()
         {
             Throwable t = null;
-            for (ICompactionScanner scanner : scanners)
+            for (ISSTableScanner scanner : scanners)
             {
                 try
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index 0c9b52a..fdcec6e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -24,6 +24,7 @@ import java.util.List;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.MergeIterator;
 
@@ -37,7 +38,7 @@ public class CompactionIterable extends AbstractCompactionIterable
         }
     };
 
-    public CompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller)
+    public CompactionIterable(OperationType type, List<ISSTableScanner> scanners, CompactionController controller)
     {
         super(controller, type, scanners);
     }
@@ -77,7 +78,7 @@ public class CompactionIterable extends AbstractCompactionIterable
             {
                 rows.clear();
                 long n = 0;
-                for (ICompactionScanner scanner : scanners)
+                for (ISSTableScanner scanner : scanners)
                     n += scanner.getCurrentPosition();
                 bytesRead = n;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index d85ffd7..3977d9c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -45,7 +44,6 @@ import com.google.common.base.Predicate;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ConcurrentHashMultiset;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
@@ -73,11 +71,7 @@ import org.apache.cassandra.db.index.SecondaryIndexBuilder;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableRewriter;
-import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CompactionMetrics;
@@ -696,7 +690,7 @@ public class CompactionManager implements CompactionManagerMBean
         if (compactionFileLocation == null)
             throw new IOException("disk full");
 
-        ICompactionScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter());
+        ISSTableScanner scanner = cleanupStrategy.getScanner(sstable, getRateLimiter());
         CleanupInfo ci = new CleanupInfo(sstable, scanner);
 
         metrics.beginCompaction(ci);
@@ -761,7 +755,7 @@ public class CompactionManager implements CompactionManagerMBean
                  : new Bounded(cfs, ranges);
         }
 
-        public abstract ICompactionScanner getScanner(SSTableReader sstable, RateLimiter limiter);
+        public abstract ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter);
         public abstract SSTableIdentityIterator cleanup(SSTableIdentityIterator row);
 
         private static final class Bounded extends CleanupStrategy
@@ -782,7 +776,7 @@ public class CompactionManager implements CompactionManagerMBean
 
             }
             @Override
-            public ICompactionScanner getScanner(SSTableReader sstable, RateLimiter limiter)
+            public ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter)
             {
                 return sstable.getScanner(ranges, limiter);
             }
@@ -808,7 +802,7 @@ public class CompactionManager implements CompactionManagerMBean
             }
 
             @Override
-            public ICompactionScanner getScanner(SSTableReader sstable, RateLimiter limiter)
+            public ISSTableScanner getScanner(SSTableReader sstable, RateLimiter limiter)
             {
                 return sstable.getScanner(limiter);
             }
@@ -1122,7 +1116,7 @@ public class CompactionManager implements CompactionManagerMBean
 
     private static class ValidationCompactionIterable extends CompactionIterable
     {
-        public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ICompactionScanner> scanners, int gcBefore)
+        public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ISSTableScanner> scanners, int gcBefore)
         {
             super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore));
         }
@@ -1288,9 +1282,9 @@ public class CompactionManager implements CompactionManagerMBean
     private static class CleanupInfo extends CompactionInfo.Holder
     {
         private final SSTableReader sstable;
-        private final ICompactionScanner scanner;
+        private final ISSTableScanner scanner;
 
-        public CleanupInfo(SSTableReader sstable, ICompactionScanner scanner)
+        public CleanupInfo(SSTableReader sstable, ISSTableScanner scanner)
         {
             this.sstable = sstable;
             this.scanner = scanner;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java b/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java
deleted file mode 100644
index ebee3ed..0000000
--- a/src/java/org/apache/cassandra/db/compaction/ICompactionScanner.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cassandra.db.compaction;
-
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.utils.CloseableIterator;
-
-/**
- * An ICompactionScanner is an abstraction allowing multiple SSTableScanners to be
- * chained together under the hood.  See LeveledCompactionStrategy.getScanners.
- */
-public interface ICompactionScanner extends CloseableIterator<OnDiskAtomIterator>
-{
-    public long getLengthInBytes();
-    public long getCurrentPosition();
-    public String getBackingFiles();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index a560234..dbb9a13 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.SSTableReader;
 
 public class LeveledCompactionStrategy extends AbstractCompactionStrategy
@@ -167,7 +168,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
             byLevel.get(sstable.getSSTableLevel()).add(sstable);
         }
 
-        List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size());
+        List<ISSTableScanner> scanners = new ArrayList<ISSTableScanner>(sstables.size());
         try
         {
             for (Integer level : byLevel.keySet())
@@ -219,14 +220,14 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
 
     // Lazily creates SSTableBoundedScanner for sstable that are assumed to be from the
     // same level (e.g. non overlapping) - see #4142
-    private static class LeveledScanner extends AbstractIterator<OnDiskAtomIterator> implements ICompactionScanner
+    private static class LeveledScanner extends AbstractIterator<OnDiskAtomIterator> implements ISSTableScanner
     {
         private final Range<Token> range;
         private final List<SSTableReader> sstables;
         private final Iterator<SSTableReader> sstableIterator;
         private final long totalLength;
 
-        private ICompactionScanner currentScanner;
+        private ISSTableScanner currentScanner;
         private long positionOffset;
 
         public LeveledScanner(Collection<SSTableReader> sstables, Range<Token> range)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
index 1d713ef..84ef97f 100644
--- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Callable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,6 +30,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.notifications.INotification;
 import org.apache.cassandra.notifications.INotificationConsumer;
@@ -318,7 +320,7 @@ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy
                 unrepairedSSTables.add(sstable);
         ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
         ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
-        List<ICompactionScanner> scanners = new ArrayList<>(repairedScanners.scanners.size() + unrepairedScanners.scanners.size());
+        List<ISSTableScanner> scanners = new ArrayList<>(repairedScanners.scanners.size() + unrepairedScanners.scanners.size());
         scanners.addAll(repairedScanners.scanners);
         scanners.addAll(unrepairedScanners.scanners);
         return new ScannerList(scanners);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 0024f24..a8188ba 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -75,7 +75,6 @@ import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.compaction.ICompactionScanner;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
@@ -1706,23 +1705,23 @@ public class SSTableReader extends SSTable
      * @param dataRange filter to use when reading the columns
      * @return A Scanner for seeking over the rows of the SSTable.
      */
-    public SSTableScanner getScanner(DataRange dataRange)
+    public ISSTableScanner getScanner(DataRange dataRange)
     {
-        return new SSTableScanner(this, dataRange, null);
+        return SSTableScanner.getScanner(this, dataRange, null);
     }
 
     /**
      * I/O SSTableScanner
      * @return A Scanner for seeking over the rows of the SSTable.
      */
-    public SSTableScanner getScanner()
+    public ISSTableScanner getScanner()
     {
         return getScanner((RateLimiter) null);
     }
 
-    public SSTableScanner getScanner(RateLimiter limiter)
+    public ISSTableScanner getScanner(RateLimiter limiter)
     {
-        return new SSTableScanner(this, DataRange.allData(partitioner), limiter);
+        return SSTableScanner.getScanner(this, DataRange.allData(partitioner), limiter);
     }
 
     /**
@@ -1731,7 +1730,7 @@ public class SSTableReader extends SSTable
      * @param range the range of keys to cover
      * @return A Scanner for seeking over the rows of the SSTable.
      */
-    public ICompactionScanner getScanner(Range<Token> range, RateLimiter limiter)
+    public ISSTableScanner getScanner(Range<Token> range, RateLimiter limiter)
     {
         if (range == null)
             return getScanner(limiter);
@@ -1744,14 +1743,9 @@ public class SSTableReader extends SSTable
     * @param ranges the range of keys to cover
     * @return A Scanner for seeking over the rows of the SSTable.
     */
-    public ICompactionScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter)
+    public ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter)
     {
-        // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
-        List<Pair<Long, Long>> positions = getPositionsForRanges(Range.normalize(ranges));
-        if (positions.isEmpty())
-            return new EmptyCompactionScanner(getFilename());
-        else
-            return new SSTableScanner(this, ranges, limiter);
+        return SSTableScanner.getScanner(this, ranges, limiter);
     }
 
     public FileDataInput getFileDataInput(long position)
@@ -2058,45 +2052,6 @@ public class SSTableReader extends SSTable
             readMeter.mark();
     }
 
-    protected class EmptyCompactionScanner implements ICompactionScanner
-    {
-        private final String filename;
-
-        public EmptyCompactionScanner(String filename)
-        {
-            this.filename = filename;
-        }
-
-        public long getLengthInBytes()
-        {
-            return 0;
-        }
-
-        public long getCurrentPosition()
-        {
-            return 0;
-        }
-
-        public String getBackingFiles()
-        {
-            return filename;
-        }
-
-        public boolean hasNext()
-        {
-            return false;
-        }
-
-        public OnDiskAtomIterator next()
-        {
-            return null;
-        }
-
-        public void close() throws IOException { }
-
-        public void remove() { }
-    }
-
     public static class SizeComparator implements Comparator<SSTableReader>
     {
         public int compare(SSTableReader o1, SSTableReader o2)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 3f1f1f0..5499195 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -18,10 +18,7 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 
 import com.google.common.collect.AbstractIterator;
 import com.google.common.util.concurrent.RateLimiter;
@@ -33,7 +30,6 @@ import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
 import org.apache.cassandra.db.columniterator.LazyColumnIterator;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.compaction.ICompactionScanner;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
@@ -41,8 +37,9 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
 
-public class SSTableScanner implements ICompactionScanner
+public class SSTableScanner implements ISSTableScanner
 {
     protected final RandomAccessReader dfile;
     protected final RandomAccessReader ifile;
@@ -55,15 +52,31 @@ public class SSTableScanner implements ICompactionScanner
 
     protected Iterator<OnDiskAtomIterator> iterator;
 
+    // We can race with the sstable for deletion during compaction.  If it's been ref counted to 0, skip
+    public static ISSTableScanner getScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
+    {
+        return sstable.acquireReference()
+                ? new SSTableScanner(sstable, dataRange, limiter)
+                : new SSTableScanner.EmptySSTableScanner(sstable.getFilename());
+    }
+    public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
+    {
+        // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
+        List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(Range.normalize(tokenRanges));
+        if (positions.isEmpty() || !sstable.acquireReference())
+            return new EmptySSTableScanner(sstable.getFilename());
+
+        return new SSTableScanner(sstable, tokenRanges, limiter);
+    }
+
     /**
      * @param sstable SSTable to scan; must not be null
      * @param dataRange a single range to scan; must not be null
      * @param limiter background i/o RateLimiter; may be null
      */
-    SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
+    private SSTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
     {
         assert sstable != null;
-        sstable.acquireReference();
 
         this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
         this.ifile = sstable.openIndexReader();
@@ -90,10 +103,9 @@ public class SSTableScanner implements ICompactionScanner
      * @param tokenRanges A set of token ranges to scan
      * @param limiter background i/o RateLimiter; may be null
      */
-    SSTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
+    private SSTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
     {
         assert sstable != null;
-        sstable.acquireReference();
 
         this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
         this.ifile = sstable.openIndexReader();
@@ -294,4 +306,45 @@ public class SSTableScanner implements ICompactionScanner
                " sstable=" + sstable +
                ")";
     }
+
+    public static class EmptySSTableScanner implements ISSTableScanner
+    {
+        private final String filename;
+
+        public EmptySSTableScanner(String filename)
+        {
+            this.filename = filename;
+        }
+
+        public long getLengthInBytes()
+        {
+            return 0;
+        }
+
+        public long getCurrentPosition()
+        {
+            return 0;
+        }
+
+        public String getBackingFiles()
+        {
+            return filename;
+        }
+
+        public boolean hasNext()
+        {
+            return false;
+        }
+
+        public OnDiskAtomIterator next()
+        {
+            return null;
+        }
+
+        public void close() throws IOException { }
+
+        public void remove() { }
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java
index e178145..22aebdb 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -321,7 +321,7 @@ public class SSTableExport
             excludeSet = new HashSet<String>(Arrays.asList(excludes));
 
         SSTableIdentityIterator row;
-        SSTableScanner scanner = reader.getScanner();
+        ISSTableScanner scanner = reader.getScanner();
         try
         {
             outs.println("[");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 090839e..a09d8b4 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -40,10 +40,7 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.dht.BytesToken;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -58,7 +55,7 @@ public class AntiCompactionTest extends SchemaLoader
     private static final String CF = "Standard1";
 
     @Test
-    public void antiCompactOne() throws InterruptedException, ExecutionException, IOException
+    public void antiCompactOne() throws Exception
     {
         ColumnFamilyStore store = prepareColumnFamilyStore();
         Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
@@ -75,7 +72,7 @@ public class AntiCompactionTest extends SchemaLoader
         int nonRepairedKeys = 0;
         for (SSTableReader sstable : store.getSSTables())
         {
-            try (SSTableScanner scanner = sstable.getScanner())
+            try (ISSTableScanner scanner = sstable.getScanner())
             {
                 while (scanner.hasNext())
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index a1ecfab..4659b5c 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -41,10 +41,7 @@ import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.dht.BytesToken;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.service.StorageService;
@@ -161,7 +158,7 @@ public class CompactionsTest extends SchemaLoader
         // check that the shadowed column is gone
         SSTableReader sstable = cfs.getSSTables().iterator().next();
         Range keyRange = new Range<RowPosition>(key, sstable.partitioner.getMinimumToken().maxKeyBound());
-        SSTableScanner scanner = sstable.getScanner(DataRange.forKeyRange(keyRange));
+        ISSTableScanner scanner = sstable.getScanner(DataRange.forKeyRange(keyRange));
         OnDiskAtomIterator iter = scanner.next();
         assertEquals(key, iter.getKey());
         assert iter.next() instanceof RangeTombstone;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index ebc6e86..4c2236b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.Util;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.notifications.SSTableAddedNotification;
 import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
@@ -145,9 +146,9 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
 
         // get LeveledScanner for level 1 sstables
         Collection<SSTableReader> sstables = strategy.manifest.getLevel(1);
-        List<ICompactionScanner> scanners = strategy.getScanners(sstables).scanners;
+        List<ISSTableScanner> scanners = strategy.getScanners(sstables).scanners;
         assertEquals(1, scanners.size()); // should be one per level
-        ICompactionScanner scanner = scanners.get(0);
+        ISSTableScanner scanner = scanners.get(0);
         // scan through to the end
         while (scanner.hasNext())
             scanner.next();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
index 4fe5cfb..678601b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
@@ -29,8 +29,8 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableScanner;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import java.util.Collections;
@@ -181,7 +181,7 @@ public class TTLExpiryTest extends SchemaLoader
         cfs.enableAutoCompaction(true);
         assertEquals(1, cfs.getSSTables().size());
         SSTableReader sstable = cfs.getSSTables().iterator().next();
-        SSTableScanner scanner = sstable.getScanner(DataRange.allData(sstable.partitioner));
+        ISSTableScanner scanner = sstable.getScanner(DataRange.allData(sstable.partitioner));
         assertTrue(scanner.hasNext());
         while(scanner.hasNext())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index a99aa0c..03b5553 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -56,7 +56,6 @@ import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.ICompactionScanner;
 import org.apache.cassandra.db.composites.Composites;
 import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.dht.LocalToken;
@@ -347,7 +346,7 @@ public class SSTableReaderTest extends SchemaLoader
         boolean foundScanner = false;
         for (SSTableReader s : store.getSSTables())
         {
-            ICompactionScanner scanner = s.getScanner(new Range<Token>(t(0), t(1), s.partitioner), null);
+            ISSTableScanner scanner = s.getScanner(new Range<Token>(t(0), t(1), s.partitioner), null);
             scanner.next(); // throws exception pre 5407
             foundScanner = true;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index c0a017e..ecf97c3 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -30,17 +29,14 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ArrayBackedSortedColumns;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.compaction.CompactionController;
-import org.apache.cassandra.db.compaction.ICompactionScanner;
 import org.apache.cassandra.db.compaction.LazilyCompactedRow;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.compaction.SSTableSplitter;
@@ -75,7 +71,7 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
         try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
         {
-            ICompactionScanner scanner = scanners.scanners.get(0);
+            ISSTableScanner scanner = scanners.scanners.get(0);
             CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
             writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
             while(scanner.hasNext())
@@ -107,7 +103,7 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
         try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
         {
-            ICompactionScanner scanner = scanners.scanners.get(0);
+            ISSTableScanner scanner = scanners.scanners.get(0);
             CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
             writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
             while (scanner.hasNext())
@@ -172,7 +168,7 @@ public class SSTableRewriterTest extends SchemaLoader
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         int files = 1;
-        try (ICompactionScanner scanner = s.getScanner();
+        try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
             while(scanner.hasNext())
@@ -221,7 +217,7 @@ public class SSTableRewriterTest extends SchemaLoader
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         int files = 1;
-        try (ICompactionScanner scanner = s.getScanner();
+        try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
             while(scanner.hasNext())
@@ -264,7 +260,7 @@ public class SSTableRewriterTest extends SchemaLoader
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         int files = 1;
-        try (ICompactionScanner scanner = s.getScanner();
+        try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
             while(scanner.hasNext())
@@ -307,7 +303,7 @@ public class SSTableRewriterTest extends SchemaLoader
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         int files = 1;
-        try (ICompactionScanner scanner = s.getScanner();
+        try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
             while(scanner.hasNext())
@@ -352,7 +348,7 @@ public class SSTableRewriterTest extends SchemaLoader
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         int files = 1;
-        try (ICompactionScanner scanner = s.getScanner();
+        try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
             while(scanner.hasNext())
@@ -395,7 +391,7 @@ public class SSTableRewriterTest extends SchemaLoader
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         int files = 1;
-        try (ICompactionScanner scanner = s.getScanner();
+        try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
             while(scanner.hasNext())
@@ -434,7 +430,7 @@ public class SSTableRewriterTest extends SchemaLoader
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
         int files = 1;
-        try (ICompactionScanner scanner = s.getScanner();
+        try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
             while(scanner.hasNext())
@@ -513,7 +509,7 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline);
         SSTableWriter w = getWriter(cfs, s.descriptor.directory);
         rewriter.switchWriter(w);
-        try (ICompactionScanner scanner = compacting.iterator().next().getScanner();
+        try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
             while (scanner.hasNext())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
index ff1a305..ff60481 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.compaction.ICompactionScanner;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.BytesToken;
 import org.apache.cassandra.dht.Range;
@@ -78,7 +77,7 @@ public class SSTableScannerTest extends SchemaLoader
 
     private static void assertScanMatches(SSTableReader sstable, int scanStart, int scanEnd, int expectedStart, int expectedEnd)
     {
-        SSTableScanner scanner = sstable.getScanner(new DataRange(boundsFor(scanStart, scanEnd), new IdentityQueryFilter()));
+        ISSTableScanner scanner = sstable.getScanner(new DataRange(boundsFor(scanStart, scanEnd), new IdentityQueryFilter()));
         for (int i = expectedStart; i <= expectedEnd; i++)
             assertEquals(toKey(i), new String(scanner.next().getKey().getKey().array()));
         assertFalse(scanner.hasNext());
@@ -86,7 +85,7 @@ public class SSTableScannerTest extends SchemaLoader
 
     private static void assertScanEmpty(SSTableReader sstable, int scanStart, int scanEnd)
     {
-        SSTableScanner scanner = sstable.getScanner(new DataRange(boundsFor(scanStart, scanEnd), new IdentityQueryFilter()));
+        ISSTableScanner scanner = sstable.getScanner(new DataRange(boundsFor(scanStart, scanEnd), new IdentityQueryFilter()));
         assertFalse(String.format("scan of (%03d, %03d] should be empty", scanStart, scanEnd), scanner.hasNext());
     }
 
@@ -108,7 +107,7 @@ public class SSTableScannerTest extends SchemaLoader
         SSTableReader sstable = store.getSSTables().iterator().next();
 
         // full range scan
-        SSTableScanner scanner = sstable.getScanner();
+        ISSTableScanner scanner = sstable.getScanner();
         for (int i = 2; i < 10; i++)
             assertEquals(toKey(i), new String(scanner.next().getKey().getKey().array()));
 
@@ -135,7 +134,7 @@ public class SSTableScannerTest extends SchemaLoader
         assertScanEmpty(sstable, 10, 11);
     }
 
-    private static void assertScanContainsRanges(ICompactionScanner scanner, int ... rangePairs)
+    private static void assertScanContainsRanges(ISSTableScanner scanner, int ... rangePairs)
     {
         assert rangePairs.length % 2 == 0;
 
@@ -172,7 +171,7 @@ public class SSTableScannerTest extends SchemaLoader
         SSTableReader sstable = store.getSSTables().iterator().next();
 
         // full range scan
-        SSTableScanner fullScanner = sstable.getScanner();
+        ISSTableScanner fullScanner = sstable.getScanner();
         assertScanContainsRanges(fullScanner,
                                  2, 9,
                                  102, 109,
@@ -180,7 +179,7 @@ public class SSTableScannerTest extends SchemaLoader
 
 
         // scan all three ranges separately
-        ICompactionScanner scanner = sstable.getScanner(makeRanges(1, 9,
+        ISSTableScanner scanner = sstable.getScanner(makeRanges(1, 9,
                                                                    101, 109,
                                                                    201, 209),
                                                         null);
@@ -302,11 +301,11 @@ public class SSTableScannerTest extends SchemaLoader
         SSTableReader sstable = store.getSSTables().iterator().next();
 
         // full range scan
-        SSTableScanner fullScanner = sstable.getScanner();
+        ISSTableScanner fullScanner = sstable.getScanner();
         assertScanContainsRanges(fullScanner, 205, 205);
 
         // scan three ranges separately
-        ICompactionScanner scanner = sstable.getScanner(makeRanges(101, 109,
+        ISSTableScanner scanner = sstable.getScanner(makeRanges(101, 109,
                                                                    201, 209), null);
 
         // this will currently fail

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fec4a42/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index d39f968..b9a3821 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -69,8 +69,8 @@ public class SSTableUtils
 
     public static void assertContentEquals(SSTableReader lhs, SSTableReader rhs)
     {
-        SSTableScanner slhs = lhs.getScanner();
-        SSTableScanner srhs = rhs.getScanner();
+        ISSTableScanner slhs = lhs.getScanner();
+        ISSTableScanner srhs = rhs.getScanner();
         while (slhs.hasNext())
         {
             OnDiskAtomIterator ilhs = slhs.next();


[2/5] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by jm...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 2488f86,0000000..fc346d1
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@@ -1,256 -1,0 +1,251 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import com.google.common.util.concurrent.RateLimiter;
 +import org.apache.cassandra.cache.KeyCacheKey;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.DataRange;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.RowIndexEntry;
 +import org.apache.cassandra.db.RowPosition;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
- import org.apache.cassandra.db.compaction.ICompactionScanner;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
 +import org.apache.cassandra.io.sstable.Descriptor;
++import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.io.util.FileDataInput;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.Pair;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +
 +/**
 + * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen.
 + * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
 + */
 +public class BigTableReader extends SSTableReader
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class);
 +
 +    BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason)
 +    {
 +        super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +    }
 +
 +    public OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns)
 +    {
 +        return new SSTableNamesIterator(this, key, columns);
 +    }
 +
 +    public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry )
 +    {
 +        return new SSTableNamesIterator(this, input, key, columns, indexEntry);
 +    }
 +
 +    public OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse)
 +    {
 +        return new SSTableSliceIterator(this, key, slices, reverse);
 +    }
 +
 +    public OnDiskAtomIterator iterator(FileDataInput input, DecoratedKey key, ColumnSlice[] slices, boolean reverse, RowIndexEntry indexEntry)
 +    {
 +        return new SSTableSliceIterator(this, input, key, slices, reverse, indexEntry);
 +    }
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
-     public ICompactionScanner getScanner(DataRange dataRange, RateLimiter limiter)
++    public ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter)
 +    {
-         return new BigTableScanner(this, dataRange, limiter);
++        return BigTableScanner.getScanner(this, dataRange, limiter);
 +    }
 +
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
 +     *
 +     * @param ranges the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
-     public ICompactionScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter)
++    public ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter)
 +    {
-         // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
-         List<Pair<Long, Long>> positions = getPositionsForRanges(Range.normalize(ranges));
-         if (positions.isEmpty())
-             return new EmptyCompactionScanner(getFilename());
-         else
-             return new BigTableScanner(this, ranges, limiter);
++        return BigTableScanner.getScanner(this, ranges, limiter);
 +    }
 +
 +
 +    /**
 +     * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
 +     * allow key selection by token bounds but only if op != * EQ
 +     * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
 +     * @param updateCacheAndStats true if updating stats and cache
 +     * @return The index entry corresponding to the key, or null if the key is not present
 +     */
 +    public RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats)
 +    {
 +        // first, check bloom filter
 +        if (op == Operator.EQ)
 +        {
 +            assert key instanceof DecoratedKey; // EQ only make sense if the key is a valid row key
 +            if (!bf.isPresent(((DecoratedKey)key).getKey()))
 +            {
 +                Tracing.trace("Bloom filter allows skipping sstable {}", descriptor.generation);
 +                return null;
 +            }
 +        }
 +
 +        // next, the key cache (only make sense for valid row key)
 +        if ((op == Operator.EQ || op == Operator.GE) && (key instanceof DecoratedKey))
 +        {
 +            DecoratedKey decoratedKey = (DecoratedKey)key;
 +            KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, decoratedKey.getKey());
 +            RowIndexEntry cachedPosition = getCachedPosition(cacheKey, updateCacheAndStats);
 +            if (cachedPosition != null)
 +            {
 +                Tracing.trace("Key cache hit for sstable {}", descriptor.generation);
 +                return cachedPosition;
 +            }
 +        }
 +
 +        // check the smallest and greatest keys in the sstable to see if it can't be present
 +        if (first.compareTo(key) > 0 || last.compareTo(key) < 0)
 +        {
 +            if (op == Operator.EQ && updateCacheAndStats)
 +                bloomFilterTracker.addFalsePositive();
 +
 +            if (op.apply(1) < 0)
 +            {
 +                Tracing.trace("Check against min and max keys allows skipping sstable {}", descriptor.generation);
 +                return null;
 +            }
 +        }
 +
 +        int binarySearchResult = indexSummary.binarySearch(key);
 +        long sampledPosition = getIndexScanPositionFromBinarySearchResult(binarySearchResult, indexSummary);
 +        int sampledIndex = getIndexSummaryIndexFromBinarySearchResult(binarySearchResult);
 +
 +        // if we matched the -1th position, we'll start at the first position
 +        sampledPosition = sampledPosition == -1 ? 0 : sampledPosition;
 +
 +        int effectiveInterval = indexSummary.getEffectiveIndexIntervalAfterIndex(sampledIndex);
 +
 +        // scan the on-disk index, starting at the nearest sampled position.
 +        // The check against IndexInterval is to be exit the loop in the EQ case when the key looked for is not present
 +        // (bloom filter false positive). But note that for non-EQ cases, we might need to check the first key of the
 +        // next index position because the searched key can be greater the last key of the index interval checked if it
 +        // is lesser than the first key of next interval (and in that case we must return the position of the first key
 +        // of the next interval).
 +        int i = 0;
 +        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
 +        while (segments.hasNext() && i <= effectiveInterval)
 +        {
 +            FileDataInput in = segments.next();
 +            try
 +            {
 +                while (!in.isEOF() && i <= effectiveInterval)
 +                {
 +                    i++;
 +
 +                    ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
 +
 +                    boolean opSatisfied; // did we find an appropriate position for the op requested
 +                    boolean exactMatch; // is the current position an exact match for the key, suitable for caching
 +
 +                    // Compare raw keys if possible for performance, otherwise compare decorated keys.
 +                    if (op == Operator.EQ)
 +                    {
 +                        opSatisfied = exactMatch = indexKey.equals(((DecoratedKey) key).getKey());
 +                    }
 +                    else
 +                    {
 +                        DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
 +                        int comparison = indexDecoratedKey.compareTo(key);
 +                        int v = op.apply(comparison);
 +                        opSatisfied = (v == 0);
 +                        exactMatch = (comparison == 0);
 +                        if (v < 0)
 +                        {
 +                            Tracing.trace("Partition index lookup allows skipping sstable {}", descriptor.generation);
 +                            return null;
 +                        }
 +                    }
 +
 +                    if (opSatisfied)
 +                    {
 +                        // read data position from index entry
 +                        RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in, descriptor.version);
 +                        if (exactMatch && updateCacheAndStats)
 +                        {
 +                            assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key
 +                            DecoratedKey decoratedKey = (DecoratedKey)key;
 +
 +                            if (logger.isTraceEnabled())
 +                            {
 +                                // expensive sanity check!  see CASSANDRA-4687
 +                                FileDataInput fdi = dfile.getSegment(indexEntry.position);
 +                                DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi));
 +                                if (!keyInDisk.equals(key))
 +                                    throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
 +                                fdi.close();
 +                            }
 +
 +                            // store exact match for the key
 +                            cacheKey(decoratedKey, indexEntry);
 +                        }
 +                        if (op == Operator.EQ && updateCacheAndStats)
 +                            bloomFilterTracker.addTruePositive();
 +                        Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation);
 +                        return indexEntry;
 +                    }
 +
 +                    RowIndexEntry.Serializer.skip(in);
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                markSuspect();
 +                throw new CorruptSSTableException(e, in.getPath());
 +            }
 +            finally
 +            {
 +                FileUtils.closeQuietly(in);
 +            }
 +        }
 +
 +        if (op == SSTableReader.Operator.EQ && updateCacheAndStats)
 +            bloomFilterTracker.addFalsePositive();
 +        Tracing.trace("Partition index lookup complete (bloom filter false positive) for sstable {}", descriptor.generation);
 +        return null;
 +    }
 +
 +
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 7e3c877,0000000..73a5d76
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@@ -1,300 -1,0 +1,354 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.Iterator;
- import java.util.List;
++import java.util.*;
 +
 +import com.google.common.collect.AbstractIterator;
 +import com.google.common.util.concurrent.RateLimiter;
 +
 +import org.apache.cassandra.db.DataRange;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.RowIndexEntry;
 +import org.apache.cassandra.db.RowPosition;
 +import org.apache.cassandra.db.columniterator.IColumnIteratorFactory;
 +import org.apache.cassandra.db.columniterator.LazyColumnIterator;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
- import org.apache.cassandra.db.compaction.ICompactionScanner;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.dht.Bounds;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
++import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.util.RandomAccessReader;
 +import org.apache.cassandra.utils.ByteBufferUtil;
++import org.apache.cassandra.utils.Pair;
 +
- public class BigTableScanner implements ICompactionScanner
++public class BigTableScanner implements ISSTableScanner
 +{
 +    protected final RandomAccessReader dfile;
 +    protected final RandomAccessReader ifile;
 +    public final SSTableReader sstable;
 +
 +    private final Iterator<AbstractBounds<RowPosition>> rangeIterator;
 +    private AbstractBounds<RowPosition> currentRange;
 +
 +    private final DataRange dataRange;
 +    private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 +
 +    protected Iterator<OnDiskAtomIterator> iterator;
 +
++    // We can race with the sstable for deletion during compaction.  If it's been ref counted to 0, skip
++    public static ISSTableScanner getScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
++    {
++        return sstable.acquireReference()
++                ? new BigTableScanner(sstable, dataRange, limiter)
++                : new BigTableScanner.EmptySSTableScanner(sstable.getFilename());
++    }
++    public static ISSTableScanner getScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
++    {
++        // We want to avoid allocating a SSTableScanner if the range don't overlap the sstable (#5249)
++        List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(Range.normalize(tokenRanges));
++        if (positions.isEmpty() || !sstable.acquireReference())
++            return new EmptySSTableScanner(sstable.getFilename());
++
++        return new BigTableScanner(sstable, tokenRanges, limiter);
++    }
++
 +    /**
 +     * @param sstable SSTable to scan; must not be null
 +     * @param dataRange a single range to scan; must not be null
 +     * @param limiter background i/o RateLimiter; may be null
 +     */
-     public BigTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
++    private BigTableScanner(SSTableReader sstable, DataRange dataRange, RateLimiter limiter)
 +    {
 +        assert sstable != null;
-         sstable.acquireReference();
 +
 +        this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
 +        this.ifile = sstable.openIndexReader();
 +        this.sstable = sstable;
 +        this.dataRange = dataRange;
 +        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
 +
 +        List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(2);
 +        if (dataRange.isWrapAround() && !dataRange.stopKey().isMinimum())
 +        {
 +            // split the wrapping range into two parts: 1) the part that starts at the beginning of the sstable, and
 +            // 2) the part that comes before the wrap-around
 +            boundsList.add(new Bounds<>(sstable.partitioner.getMinimumToken().minKeyBound(), dataRange.stopKey()));
 +            boundsList.add(new Bounds<>(dataRange.startKey(), sstable.partitioner.getMinimumToken().maxKeyBound()));
 +        }
 +        else
 +        {
 +            boundsList.add(new Bounds<>(dataRange.startKey(), dataRange.stopKey()));
 +        }
 +        this.rangeIterator = boundsList.iterator();
 +    }
 +
 +    /**
 +     * @param sstable SSTable to scan; must not be null
 +     * @param tokenRanges A set of token ranges to scan
 +     * @param limiter background i/o RateLimiter; may be null
 +     */
-     public BigTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
++    private BigTableScanner(SSTableReader sstable, Collection<Range<Token>> tokenRanges, RateLimiter limiter)
 +    {
 +        assert sstable != null;
-         sstable.acquireReference();
 +
 +        this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
 +        this.ifile = sstable.openIndexReader();
 +        this.sstable = sstable;
 +        this.dataRange = null;
 +        this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata);
 +
 +        List<Range<Token>> normalized = Range.normalize(tokenRanges);
 +        List<AbstractBounds<RowPosition>> boundsList = new ArrayList<>(normalized.size());
 +        for (Range<Token> range : normalized)
 +            boundsList.add(new Range<RowPosition>(range.left.maxKeyBound(), range.right.maxKeyBound()));
 +
 +        this.rangeIterator = boundsList.iterator();
 +    }
 +
 +    private void seekToCurrentRangeStart()
 +    {
 +        if (currentRange.left.isMinimum())
 +            return;
 +
 +        long indexPosition = sstable.getIndexScanPosition(currentRange.left);
 +        // -1 means the key is before everything in the sstable. So just start from the beginning.
 +        if (indexPosition == -1)
 +        {
 +            // Note: this method shouldn't assume we're at the start of the sstable already (see #6638) and
 +            // the seeks are no-op anyway if we are.
 +            ifile.seek(0);
 +            dfile.seek(0);
 +            return;
 +        }
 +
 +        ifile.seek(indexPosition);
 +        try
 +        {
 +
 +            while (!ifile.isEOF())
 +            {
 +                indexPosition = ifile.getFilePointer();
 +                DecoratedKey indexDecoratedKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
 +                int comparison = indexDecoratedKey.compareTo(currentRange.left);
 +                // because our range start may be inclusive or exclusive, we need to also contains()
 +                // instead of just checking (comparison >= 0)
 +                if (comparison > 0 || currentRange.contains(indexDecoratedKey))
 +                {
 +                    // Found, just read the dataPosition and seek into index and data files
 +                    long dataPosition = ifile.readLong();
 +                    ifile.seek(indexPosition);
 +                    dfile.seek(dataPosition);
 +                    break;
 +                }
 +                else
 +                {
 +                    RowIndexEntry.Serializer.skip(ifile);
 +                }
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            sstable.markSuspect();
 +            throw new CorruptSSTableException(e, sstable.getFilename());
 +        }
 +    }
 +
 +    public void close() throws IOException
 +    {
 +        FileUtils.close(dfile, ifile);
 +        sstable.releaseReference();
 +    }
 +
 +    public long getLengthInBytes()
 +    {
 +        return dfile.length();
 +    }
 +
 +    public long getCurrentPosition()
 +    {
 +        return dfile.getFilePointer();
 +    }
 +
 +    public String getBackingFiles()
 +    {
 +        return sstable.toString();
 +    }
 +
 +    public boolean hasNext()
 +    {
 +        if (iterator == null)
 +            iterator = createIterator();
 +        return iterator.hasNext();
 +    }
 +
 +    public OnDiskAtomIterator next()
 +    {
 +        if (iterator == null)
 +            iterator = createIterator();
 +        return iterator.next();
 +    }
 +
 +    public void remove()
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
 +    private Iterator<OnDiskAtomIterator> createIterator()
 +    {
 +        return new KeyScanningIterator();
 +    }
 +
 +    protected class KeyScanningIterator extends AbstractIterator<OnDiskAtomIterator>
 +    {
 +        private DecoratedKey nextKey;
 +        private RowIndexEntry nextEntry;
 +        private DecoratedKey currentKey;
 +        private RowIndexEntry currentEntry;
 +
 +        protected OnDiskAtomIterator computeNext()
 +        {
 +            try
 +            {
 +                if (nextEntry == null)
 +                {
 +                    do
 +                    {
 +                        // we're starting the first range or we just passed the end of the previous range
 +                        if (!rangeIterator.hasNext())
 +                            return endOfData();
 +
 +                        currentRange = rangeIterator.next();
 +                        seekToCurrentRangeStart();
 +
 +                        if (ifile.isEOF())
 +                            return endOfData();
 +
 +                        currentKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
 +                        currentEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
 +                    } while (!currentRange.contains(currentKey));
 +                }
 +                else
 +                {
 +                    // we're in the middle of a range
 +                    currentKey = nextKey;
 +                    currentEntry = nextEntry;
 +                }
 +
 +                long readEnd;
 +                if (ifile.isEOF())
 +                {
 +                    nextEntry = null;
 +                    nextKey = null;
 +                    readEnd = dfile.length();
 +                }
 +                else
 +                {
 +                    // we need the position of the start of the next key, regardless of whether it falls in the current range
 +                    nextKey = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(ifile));
 +                    nextEntry = rowIndexEntrySerializer.deserialize(ifile, sstable.descriptor.version);
 +                    readEnd = nextEntry.position;
 +
 +                    if (!currentRange.contains(nextKey))
 +                    {
 +                        nextKey = null;
 +                        nextEntry = null;
 +                    }
 +                }
 +
 +                if (dataRange == null || dataRange.selectsFullRowFor(currentKey.getKey()))
 +                {
 +                    dfile.seek(currentEntry.position + currentEntry.headerOffset());
 +                    ByteBufferUtil.readWithShortLength(dfile); // key
 +                    return new SSTableIdentityIterator(sstable, dfile, currentKey);
 +                }
 +
 +                return new LazyColumnIterator(currentKey, new IColumnIteratorFactory()
 +                {
 +                    public OnDiskAtomIterator create()
 +                    {
 +                        return dataRange.columnFilter(currentKey.getKey()).getSSTableColumnIterator(sstable, dfile, currentKey, currentEntry);
 +                    }
 +                });
 +
 +            }
 +            catch (IOException e)
 +            {
 +                sstable.markSuspect();
 +                throw new CorruptSSTableException(e, sstable.getFilename());
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return getClass().getSimpleName() + "(" +
 +               "dfile=" + dfile +
 +               " ifile=" + ifile +
 +               " sstable=" + sstable +
 +               ")";
 +    }
++
++    public static class EmptySSTableScanner implements ISSTableScanner
++    {
++        private final String filename;
++
++        public EmptySSTableScanner(String filename)
++        {
++            this.filename = filename;
++        }
++
++        public long getLengthInBytes()
++        {
++            return 0;
++        }
++
++        public long getCurrentPosition()
++        {
++            return 0;
++        }
++
++        public String getBackingFiles()
++        {
++            return filename;
++        }
++
++        public boolean hasNext()
++        {
++            return false;
++        }
++
++        public OnDiskAtomIterator next()
++        {
++            return null;
++        }
++
++        public void close() throws IOException { }
++
++        public void remove() { }
++    }
++
++
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/SSTableExport.java
index 3a04a81,22aebdb..fa6b973
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@@ -22,8 -22,6 +22,7 @@@ import java.io.IOException
  import java.io.PrintStream;
  import java.util.*;
  
- import org.apache.cassandra.db.compaction.ICompactionScanner;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.commons.cli.*;
  
  import org.apache.cassandra.config.CFMetaData;
@@@ -320,10 -318,10 +319,10 @@@ public class SSTableExpor
          Set<String> excludeSet = new HashSet<String>();
  
          if (excludes != null)
 -            excludeSet = new HashSet<String>(Arrays.asList(excludes));
 +            excludeSet = new HashSet<>(Arrays.asList(excludes));
  
          SSTableIdentityIterator row;
-         ICompactionScanner scanner = reader.getScanner();
+         ISSTableScanner scanner = reader.getScanner();
          try
          {
              outs.println("[");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 2396acb,a09d8b4..63a49de
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@@ -46,13 -37,15 +46,13 @@@ import org.apache.cassandra.db.ColumnFa
  import org.apache.cassandra.db.DecoratedKey;
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.db.Mutation;
 -import org.apache.cassandra.dht.BytesToken;
 +import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
- import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+ import org.apache.cassandra.io.sstable.*;
 -import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.service.ActiveRepairService;
  import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.junit.After;
 -import org.junit.Test;
  
  import com.google.common.collect.Iterables;
  
@@@ -207,38 -150,14 +207,38 @@@ public class AntiCompactionTes
          List<Range<Token>> ranges = Arrays.asList(range);
  
          SSTableReader.acquireReferences(sstables);
 -        CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
 -
 -        assertThat(store.getSSTables().size(), is(1));
 -        assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
 -        assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
 -        assertThat(store.getDataTracker().getCompacting().size(), is(0));
 +        long repairedAt = 1000;
 +        CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt);
 +        /*
 +        Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
 +        so there will be no net change in the number of sstables
 +         */
 +        assertEquals(10, store.getSSTables().size());
 +        int repairedKeys = 0;
 +        int nonRepairedKeys = 0;
 +        for (SSTableReader sstable : store.getSSTables())
 +        {
-             ICompactionScanner scanner = sstable.getScanner();
++            ISSTableScanner scanner = sstable.getScanner();
 +            while (scanner.hasNext())
 +            {
 +                SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
 +                if (sstable.isRepaired())
 +                {
 +                    assertTrue(range.contains(row.getKey().getToken()));
 +                    assertEquals(repairedAt, sstable.getSSTableMetadata().repairedAt);
 +                    repairedKeys++;
 +                }
 +                else
 +                {
 +                    assertFalse(range.contains(row.getKey().getToken()));
 +                    assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt);
 +                    nonRepairedKeys++;
 +                }
 +            }
 +        }
 +        assertEquals(repairedKeys, 40);
 +        assertEquals(nonRepairedKeys, 60);
      }
 -
      @Test
      public void shouldMutateRepairedAt() throws InterruptedException, ExecutionException, IOException
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index faf3808,4659b5c..a6ee3f9
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@@ -189,11 -158,11 +189,11 @@@ public class CompactionsTes
          // check that the shadowed column is gone
          SSTableReader sstable = cfs.getSSTables().iterator().next();
          Range keyRange = new Range<RowPosition>(key, sstable.partitioner.getMinimumToken().maxKeyBound());
-         ICompactionScanner scanner = sstable.getScanner(DataRange.forKeyRange(keyRange));
+         ISSTableScanner scanner = sstable.getScanner(DataRange.forKeyRange(keyRange));
          OnDiskAtomIterator iter = scanner.next();
          assertEquals(key, iter.getKey());
 -        assert iter.next() instanceof RangeTombstone;
 -        assert !iter.hasNext();
 +        assertTrue(iter.next() instanceof RangeTombstone);
 +        assertFalse(iter.hasNext());
      }
  
      @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index d49d8bb,4c2236b..1eca4e6
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@@ -36,15 -31,11 +36,16 @@@ import org.junit.runner.RunWith
  import org.apache.cassandra.OrderedJUnit4ClassRunner;
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 -import org.apache.cassandra.db.*;
 +import org.apache.cassandra.config.KSMetaData;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.Mutation;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.ISSTableScanner;
 -import org.apache.cassandra.io.sstable.SSTableReader;
 +import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.notifications.SSTableAddedNotification;
  import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
  import org.apache.cassandra.repair.RepairJobDesc;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
index 924c4b5,678601b..c56c910
--- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
@@@ -29,11 -27,10 +29,12 @@@ import org.junit.runner.RunWith
  import org.apache.cassandra.OrderedJUnit4ClassRunner;
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.ISSTableScanner;
 -import org.apache.cassandra.io.sstable.SSTableReader;
 +import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.utils.ByteBufferUtil;
  
  import java.util.Collections;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 2a0c3e6,03b5553..647a67b
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@@ -59,10 -56,9 +59,9 @@@ import org.apache.cassandra.db.Row
  import org.apache.cassandra.db.RowPosition;
  import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
  import org.apache.cassandra.db.compaction.CompactionManager;
- import org.apache.cassandra.db.compaction.ICompactionScanner;
  import org.apache.cassandra.db.composites.Composites;
  import org.apache.cassandra.dht.LocalPartitioner;
 -import org.apache.cassandra.dht.LocalToken;
 +import org.apache.cassandra.dht.LocalPartitioner.LocalToken;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.io.util.FileDataInput;
@@@ -370,7 -346,7 +369,7 @@@ public class SSTableReaderTes
          boolean foundScanner = false;
          for (SSTableReader s : store.getSSTables())
          {
-             ICompactionScanner scanner = s.getScanner(new Range<Token>(t(0), t(1)), null);
 -            ISSTableScanner scanner = s.getScanner(new Range<Token>(t(0), t(1), s.partitioner), null);
++            ISSTableScanner scanner = s.getScanner(new Range<Token>(t(0), t(1)), null);
              scanner.next(); // throws exception pre 5407
              foundScanner = true;
          }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 5eae831,ecf97c3..24a0091
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -41,14 -37,10 +40,13 @@@ import org.apache.cassandra.db.Mutation
  import org.apache.cassandra.db.compaction.AbstractCompactedRow;
  import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
  import org.apache.cassandra.db.compaction.CompactionController;
- import org.apache.cassandra.db.compaction.ICompactionScanner;
  import org.apache.cassandra.db.compaction.LazilyCompactedRow;
  import org.apache.cassandra.db.compaction.OperationType;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.db.compaction.SSTableSplitter;
 -import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
  import org.apache.cassandra.metrics.StorageMetrics;
  import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.ByteBufferUtil;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
index d4053eb,ff60481..9da895e
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
@@@ -27,15 -25,12 +27,14 @@@ import org.junit.Test
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 +import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
- import org.apache.cassandra.db.compaction.ICompactionScanner;
  import org.apache.cassandra.dht.Bounds;
 -import org.apache.cassandra.dht.BytesToken;
 +import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.utils.ByteBufferUtil;
  
  import static org.junit.Assert.*;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 4f3d22a,b9a3821..00f07ff
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@@ -25,9 -25,6 +25,8 @@@ import java.util.*
  
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
- import org.apache.cassandra.db.compaction.ICompactionScanner;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
  import org.apache.cassandra.service.ActiveRepairService;
  import org.apache.cassandra.utils.ByteBufferUtil;
  


[4/5] cassandra git commit: add missing file

Posted by jm...@apache.org.
add missing file


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ffa80673
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ffa80673
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ffa80673

Branch: refs/heads/trunk
Commit: ffa806733e7492d3d4b54957f911af501e043df9
Parents: 1fec4a4
Author: Joshua McKenzie <jm...@apache.org>
Authored: Tue Dec 16 15:06:02 2014 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Tue Dec 16 15:06:02 2014 -0600

----------------------------------------------------------------------
 .../cassandra/io/sstable/ISSTableScanner.java   | 34 ++++++++++++++++++++
 1 file changed, 34 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ffa80673/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
new file mode 100644
index 0000000..b80bd87
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.utils.CloseableIterator;
+
+/**
+ * An ISSTableScanner is an abstraction allowing multiple SSTableScanners to be
+ * chained together under the hood.  See LeveledCompactionStrategy.getScanners.
+ */
+public interface ISSTableScanner extends CloseableIterator<OnDiskAtomIterator>
+{
+    public long getLengthInBytes();
+    public long getCurrentPosition();
+    public String getBackingFiles();
+}


[5/5] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by jm...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/32ac6af2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/32ac6af2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/32ac6af2

Branch: refs/heads/trunk
Commit: 32ac6af2ba936c23e61d2ac4ab9cd32175cb5176
Parents: bee53d7 ffa8067
Author: Joshua McKenzie <jm...@apache.org>
Authored: Tue Dec 16 15:06:32 2014 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Tue Dec 16 15:06:32 2014 -0600

----------------------------------------------------------------------

----------------------------------------------------------------------



[3/5] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by jm...@apache.org.
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
	src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
	src/java/org/apache/cassandra/db/compaction/CompactionManager.java
	src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
	src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
	src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
	src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
	src/java/org/apache/cassandra/tools/SSTableExport.java
	test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
	test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
	test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
	test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
	test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
	test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
	test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
	test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bee53d72
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bee53d72
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bee53d72

Branch: refs/heads/trunk
Commit: bee53d72a530aab2949d12bd9d2320b76811c85a
Parents: e73fccd 1fec4a4
Author: Joshua McKenzie <jm...@apache.org>
Authored: Tue Dec 16 15:03:05 2014 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Tue Dec 16 15:03:05 2014 -0600

----------------------------------------------------------------------
 .../compaction/AbstractCompactionIterable.java  |  7 +-
 .../compaction/AbstractCompactionStrategy.java  |  9 +--
 .../db/compaction/CompactionIterable.java       |  5 +-
 .../db/compaction/CompactionManager.java        | 29 +++-----
 .../db/compaction/ICompactionScanner.java       | 34 ---------
 .../compaction/LeveledCompactionStrategy.java   |  7 +-
 .../compaction/WrappingCompactionStrategy.java  |  4 +-
 .../cassandra/io/sstable/ISSTableScanner.java   | 34 +++++++++
 .../io/sstable/format/SSTableReader.java        | 52 ++------------
 .../io/sstable/format/big/BigTableReader.java   | 15 ++--
 .../io/sstable/format/big/BigTableScanner.java  | 74 +++++++++++++++++---
 .../apache/cassandra/tools/SSTableExport.java   |  3 +-
 .../db/compaction/AntiCompactionTest.java       |  6 +-
 .../db/compaction/CompactionsTest.java          |  2 +-
 .../LeveledCompactionStrategyTest.java          |  5 +-
 .../cassandra/db/compaction/TTLExpiryTest.java  |  3 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |  3 +-
 .../io/sstable/SSTableRewriterTest.java         | 22 +++---
 .../io/sstable/SSTableScannerTest.java          | 17 +++--
 .../cassandra/io/sstable/SSTableUtils.java      |  5 +-
 20 files changed, 168 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 1d6d2e1,337657d..0f44b4b
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@@ -34,6 -33,8 +34,7 @@@ import org.apache.cassandra.dht.Range
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.exceptions.ConfigurationException;
  import org.apache.cassandra.io.sstable.Component;
+ import org.apache.cassandra.io.sstable.ISSTableScanner;
 -import org.apache.cassandra.io.sstable.SSTableReader;
  import org.apache.cassandra.utils.JVMStabilityInspector;
  
  /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index 86918bc,fdcec6e..cd08b81
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@@ -24,7 -24,7 +24,8 @@@ import java.util.List
  import com.google.common.collect.ImmutableList;
  
  import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableFormat;
+ import org.apache.cassandra.io.sstable.ISSTableScanner;
  import org.apache.cassandra.utils.CloseableIterator;
  import org.apache.cassandra.utils.MergeIterator;
  
@@@ -40,10 -38,9 +41,10 @@@ public class CompactionIterable extend
          }
      };
  
-     public CompactionIterable(OperationType type, List<ICompactionScanner> scanners, CompactionController controller, SSTableFormat.Type formatType)
 -    public CompactionIterable(OperationType type, List<ISSTableScanner> scanners, CompactionController controller)
++    public CompactionIterable(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, SSTableFormat.Type formatType)
      {
          super(controller, type, scanners);
 +        this.format = formatType.info;
      }
  
      public CloseableIterator<AbstractCompactedRow> iterator()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index ed875b8,3977d9c..57f35f5
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -20,15 -20,14 +20,7 @@@ package org.apache.cassandra.db.compact
  import java.io.File;
  import java.io.IOException;
  import java.lang.management.ManagementFactory;
--import java.util.ArrayList;
- import java.util.Arrays;
--import java.util.Collection;
--import java.util.Collections;
--import java.util.HashSet;
--import java.util.Iterator;
--import java.util.List;
--import java.util.Map;
--import java.util.Set;
++import java.util.*;
  import java.util.concurrent.BlockingQueue;
  import java.util.concurrent.Callable;
  import java.util.concurrent.ExecutionException;
@@@ -76,9 -71,8 +67,7 @@@ import org.apache.cassandra.db.index.Se
  import org.apache.cassandra.dht.Bounds;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
- import org.apache.cassandra.io.sstable.Descriptor;
- import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
- import org.apache.cassandra.io.sstable.SSTableRewriter;
+ import org.apache.cassandra.io.sstable.*;
 -import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
  import org.apache.cassandra.io.util.FileUtils;
  import org.apache.cassandra.metrics.CompactionMetrics;
  import org.apache.cassandra.repair.Validator;
@@@ -1197,9 -1116,9 +1186,9 @@@ public class CompactionManager implemen
  
      private static class ValidationCompactionIterable extends CompactionIterable
      {
-         public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ICompactionScanner> scanners, int gcBefore)
+         public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ISSTableScanner> scanners, int gcBefore)
          {
 -            super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore));
 +            super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs, gcBefore), DatabaseDescriptor.getSSTableFormat());
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 99ae9cd,dbb9a13..38db062
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@@ -44,6 -32,8 +44,7 @@@ import org.apache.cassandra.db.columnit
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.ISSTableScanner;
 -import org.apache.cassandra.io.sstable.SSTableReader;
  
  public class LeveledCompactionStrategy extends AbstractCompactionStrategy
  {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
index 32e63bb,84ef97f..bb5580d
--- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
@@@ -29,7 -30,8 +30,8 @@@ import org.apache.cassandra.config.CFMe
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.io.sstable.ISSTableScanner;
 -import org.apache.cassandra.io.sstable.SSTableReader;
  import org.apache.cassandra.notifications.INotification;
  import org.apache.cassandra.notifications.INotificationConsumer;
  import org.apache.cassandra.notifications.SSTableAddedNotification;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
index 0000000,0000000..b80bd87
new file mode 100644
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
@@@ -1,0 -1,0 +1,34 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++package org.apache.cassandra.io.sstable;
++
++import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
++import org.apache.cassandra.utils.CloseableIterator;
++
++/**
++ * An ISSTableScanner is an abstraction allowing multiple SSTableScanners to be
++ * chained together under the hood.  See LeveledCompactionStrategy.getScanners.
++ */
++public interface ISSTableScanner extends CloseableIterator<OnDiskAtomIterator>
++{
++    public long getLengthInBytes();
++    public long getCurrentPosition();
++    public String getBackingFiles();
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bee53d72/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index c0d0abb,0000000..36b94ae
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -1,1931 -1,0 +1,1891 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format;
 +
 +import java.io.*;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Predicate;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.Ordering;
 +import com.google.common.primitives.Longs;
 +import com.google.common.util.concurrent.RateLimiter;
 +
 +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 +import com.clearspring.analytics.stream.cardinality.ICardinality;
 +import org.apache.cassandra.cache.CachingOptions;
 +import org.apache.cassandra.cache.InstrumentingCache;
 +import org.apache.cassandra.cache.KeyCacheKey;
 +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.ScheduledExecutors;
 +import org.apache.cassandra.config.*;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.db.commitlog.ReplayPosition;
- import org.apache.cassandra.db.compaction.ICompactionScanner;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.db.index.SecondaryIndex;
 +import org.apache.cassandra.dht.*;
 +import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 +import org.apache.cassandra.io.compress.CompressedThrottledReader;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.metadata.*;
 +import org.apache.cassandra.io.util.*;
 +import org.apache.cassandra.metrics.RestorableMeter;
 +import org.apache.cassandra.metrics.StorageMetrics;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.CacheService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
 +
 +/**
 + * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen.
 + * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
 + */
 +public abstract class SSTableReader extends SSTable
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
 +
 +    private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1);
 +    private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0);
 +
 +    public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            long ts1 = o1.getMaxTimestamp();
 +            long ts2 = o2.getMaxTimestamp();
 +            return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1));
 +        }
 +    };
 +
 +    public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            return o1.first.compareTo(o2.first);
 +        }
 +    };
 +
 +    public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
 +
 +    /**
 +     * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound
 +     * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
 +     * later than maxDataAge.
 +     *
 +     * The field is not serialized to disk, so relying on it for more than what truncate does is not advised.
 +     *
 +     * When a new sstable is flushed, maxDataAge is set to the time of creation.
 +     * When a sstable is created from compaction, maxDataAge is set to max of all merged sstables.
 +     *
 +     * The age is in milliseconds since epoc and is local to this host.
 +     */
 +    public final long maxDataAge;
 +
 +    public enum OpenReason
 +    {
 +        NORMAL,
 +        EARLY,
 +        METADATA_CHANGE
 +    }
 +
 +    public final OpenReason openReason;
 +
 +    // indexfile and datafile: might be null before a call to load()
 +    protected SegmentedFile ifile;
 +    protected SegmentedFile dfile;
 +
 +    protected IndexSummary indexSummary;
 +    protected IFilter bf;
 +
 +    protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 +
 +    protected InstrumentingCache<KeyCacheKey, RowIndexEntry> keyCache;
 +
 +    protected final BloomFilterTracker bloomFilterTracker = new BloomFilterTracker();
 +
 +    protected final AtomicInteger references = new AtomicInteger(1);
 +    // technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted,
 +    // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
 +    protected final AtomicBoolean isCompacted = new AtomicBoolean(false);
 +    protected final AtomicBoolean isSuspect = new AtomicBoolean(false);
 +
 +    // not final since we need to be able to change level on a file.
 +    protected volatile StatsMetadata sstableMetadata;
 +
 +    protected final AtomicLong keyCacheHit = new AtomicLong(0);
 +    protected final AtomicLong keyCacheRequest = new AtomicLong(0);
 +
 +    /**
 +     * To support replacing this sstablereader with another object that represents that same underlying sstable, but with different associated resources,
 +     * we build a linked-list chain of replacement, which we synchronise using a shared object to make maintenance of the list across multiple threads simple.
 +     * On close we check if any of the closeable resources differ between any chains either side of us; any that are in neither of the adjacent links (if any) are closed.
 +     * Once we've made this decision we remove ourselves from the linked list, so that anybody behind/ahead will compare against only other still opened resources.
 +     */
 +    protected Object replaceLock = new Object();
 +    protected SSTableReader replacedBy;
 +    private SSTableReader replaces;
 +    private SSTableReader sharesBfWith;
 +    private SSTableDeletingTask deletingTask;
 +    private Runnable runOnClose;
 +
 +    @VisibleForTesting
 +    public RestorableMeter readMeter;
 +    protected ScheduledFuture readMeterSyncFuture;
 +
 +    /**
 +     * Calculate approximate key count.
 +     * If cardinality estimator is available on all given sstables, then this method use them to estimate
 +     * key count.
 +     * If not, then this uses index summaries.
 +     *
 +     * @param sstables SSTables to calculate key count
 +     * @return estimated key count
 +     */
 +    public static long getApproximateKeyCount(Collection<SSTableReader> sstables)
 +    {
 +        long count = -1;
 +
 +        // check if cardinality estimator is available for all SSTables
 +        boolean cardinalityAvailable = !sstables.isEmpty() && Iterators.all(sstables.iterator(), new Predicate<SSTableReader>()
 +        {
 +            public boolean apply(SSTableReader sstable)
 +            {
 +                return sstable.descriptor.version.hasNewStatsFile();
 +            }
 +        });
 +
 +        // if it is, load them to estimate key count
 +        if (cardinalityAvailable)
 +        {
 +            boolean failed = false;
 +            ICardinality cardinality = null;
 +            for (SSTableReader sstable : sstables)
 +            {
 +                try
 +                {
 +                    CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION);
 +                    assert metadata != null : sstable.getFilename();
 +                    if (cardinality == null)
 +                        cardinality = metadata.cardinalityEstimator;
 +                    else
 +                        cardinality = cardinality.merge(metadata.cardinalityEstimator);
 +                }
 +                catch (IOException e)
 +                {
 +                    logger.warn("Reading cardinality from Statistics.db failed.", e);
 +                    failed = true;
 +                    break;
 +                }
 +                catch (CardinalityMergeException e)
 +                {
 +                    logger.warn("Cardinality merge failed.", e);
 +                    failed = true;
 +                    break;
 +                }
 +            }
 +            if (cardinality != null && !failed)
 +                count = cardinality.cardinality();
 +        }
 +
 +        // if something went wrong above or cardinality is not available, calculate using index summary
 +        if (count < 0)
 +        {
 +            for (SSTableReader sstable : sstables)
 +                count += sstable.estimatedKeys();
 +        }
 +        return count;
 +    }
 +
 +    public static SSTableReader open(Descriptor descriptor) throws IOException
 +    {
 +        CFMetaData metadata;
 +        if (descriptor.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR))
 +        {
 +            int i = descriptor.cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
 +            String parentName = descriptor.cfname.substring(0, i);
 +            CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName);
 +            ColumnDefinition def = parent.getColumnDefinitionForIndex(descriptor.cfname.substring(i + 1));
 +            metadata = CFMetaData.newIndexMetadata(parent, def, SecondaryIndex.getIndexComparator(parent, def));
 +        }
 +        else
 +        {
 +            metadata = Schema.instance.getCFMetaData(descriptor.ksname, descriptor.cfname);
 +        }
 +        return open(descriptor, metadata);
 +    }
 +
 +    public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException
 +    {
 +        IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR)
 +                ? new LocalPartitioner(metadata.getKeyValidator())
 +                : StorageService.getPartitioner();
 +        return open(desc, componentsFor(desc), metadata, p);
 +    }
 +
 +    public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 +    {
 +        return open(descriptor, components, metadata, partitioner, true);
 +    }
 +
 +    public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
 +    {
 +        return open(descriptor, components, metadata, StorageService.getPartitioner(), false);
 +    }
 +
 +    /**
 +     * Open SSTable reader to be used in batch mode(such as sstableloader).
 +     *
 +     * @param descriptor
 +     * @param components
 +     * @param metadata
 +     * @param partitioner
 +     * @return opened SSTableReader
 +     * @throws IOException
 +     */
 +    public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 +    {
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
 +        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
 +                EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable or no stats available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
 +                    descriptor, validationMetadata.partitioner, partitionerName));
 +            System.exit(1);
 +        }
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
 +        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
 +                statsMetadata, OpenReason.NORMAL);
 +
 +        // special implementation of load to use non-pooled SegmentedFile builders
 +        SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
 +        SegmentedFile.Builder dbuilder = sstable.compression
 +                ? new CompressedSegmentedFile.Builder(null)
 +                : new BufferedSegmentedFile.Builder();
 +        if (!sstable.loadSummary(ibuilder, dbuilder))
 +            sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
 +        sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
 +        sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
 +        sstable.bf = FilterFactory.AlwaysPresent;
 +
 +        return sstable;
 +    }
 +
 +    private static SSTableReader open(Descriptor descriptor,
 +                                      Set<Component> components,
 +                                      CFMetaData metadata,
 +                                      IPartitioner partitioner,
 +                                      boolean validate) throws IOException
 +    {
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
 +        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
 +                EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable or no stats available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
 +                    descriptor, validationMetadata.partitioner, partitionerName));
 +            System.exit(1);
 +        }
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
 +        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
 +                statsMetadata, OpenReason.NORMAL);
 +
 +        // load index and filter
 +        long start = System.nanoTime();
 +        sstable.load(validationMetadata);
 +        logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 +
 +        if (validate)
 +            sstable.validate();
 +
 +        if (sstable.getKeyCache() != null)
 +            logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
 +
 +        return sstable;
 +    }
 +
 +    public static void logOpenException(Descriptor descriptor, IOException e)
 +    {
 +        if (e instanceof FileNotFoundException)
 +            logger.error("Missing sstable component in {}; skipped because of {}", descriptor, e.getMessage());
 +        else
 +            logger.error("Corrupt sstable {}; skipped", descriptor, e);
 +    }
 +
 +    public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
 +                                                    final CFMetaData metadata,
 +                                                    final IPartitioner partitioner)
 +    {
 +        final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>();
 +
 +        ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors());
 +        for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
 +        {
 +            Runnable runnable = new Runnable()
 +            {
 +                public void run()
 +                {
 +                    SSTableReader sstable;
 +                    try
 +                    {
 +                        sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner);
 +                    }
 +                    catch (IOException ex)
 +                    {
 +                        logger.error("Corrupt sstable {}; skipped", entry, ex);
 +                        return;
 +                    }
 +                    sstables.add(sstable);
 +                }
 +            };
 +            executor.submit(runnable);
 +        }
 +
 +        executor.shutdown();
 +        try
 +        {
 +            executor.awaitTermination(7, TimeUnit.DAYS);
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new AssertionError(e);
 +        }
 +
 +        return sstables;
 +
 +    }
 +
 +    /**
 +     * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
 +     */
 +    public static SSTableReader internalOpen(Descriptor desc,
 +                                      Set<Component> components,
 +                                      CFMetaData metadata,
 +                                      IPartitioner partitioner,
 +                                      SegmentedFile ifile,
 +                                      SegmentedFile dfile,
 +                                      IndexSummary isummary,
 +                                      IFilter bf,
 +                                      long maxDataAge,
 +                                      StatsMetadata sstableMetadata,
 +                                      OpenReason openReason)
 +    {
 +        assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
 +
 +        SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +
 +        reader.bf = bf;
 +        reader.ifile = ifile;
 +        reader.dfile = dfile;
 +        reader.indexSummary = isummary;
 +
 +        return reader;
 +    }
 +
 +
 +    private static SSTableReader internalOpen(final Descriptor descriptor,
 +                                            Set<Component> components,
 +                                            CFMetaData metadata,
 +                                            IPartitioner partitioner,
 +                                            Long maxDataAge,
 +                                            StatsMetadata sstableMetadata,
 +                                            OpenReason openReason)
 +    {
 +        Factory readerFactory = descriptor.getFormat().getReaderFactory();
 +
 +        return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +    }
 +
 +    protected SSTableReader(final Descriptor desc,
 +                            Set<Component> components,
 +                            CFMetaData metadata,
 +                            IPartitioner partitioner,
 +                            long maxDataAge,
 +                            StatsMetadata sstableMetadata,
 +                            OpenReason openReason)
 +    {
 +        super(desc, components, metadata, partitioner);
 +        this.sstableMetadata = sstableMetadata;
 +        this.maxDataAge = maxDataAge;
 +        this.openReason = openReason;
 +
 +        this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
 +
 +        deletingTask = new SSTableDeletingTask(this);
 +
 +        // Don't track read rates for tables in the system keyspace.  Also don't track reads for special operations (like early open)
 +        // this is to avoid overflowing the executor queue (see CASSANDRA-8066)
 +        if (SystemKeyspace.NAME.equals(desc.ksname) || openReason != OpenReason.NORMAL)
 +        {
 +            readMeter = null;
 +            readMeterSyncFuture = null;
 +            return;
 +        }
 +
 +        readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
 +        // sync the average read rate to system.sstable_activity every five minutes, starting one minute from now
 +        readMeterSyncFuture = syncExecutor.scheduleAtFixedRate(new Runnable()
 +        {
 +            public void run()
 +            {
 +                if (!isCompacted.get())
 +                {
 +                    meterSyncThrottle.acquire();
 +                    SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.generation, readMeter);
 +                }
 +            }
 +        }, 1, 5, TimeUnit.MINUTES);
 +    }
 +
 +    public static long getTotalBytes(Iterable<SSTableReader> sstables)
 +    {
 +        long sum = 0;
 +        for (SSTableReader sstable : sstables)
 +        {
 +            sum += sstable.onDiskLength();
 +        }
 +        return sum;
 +    }
 +
 +    private void tidy(boolean release)
 +    {
 +        if (readMeterSyncFuture != null)
 +            readMeterSyncFuture.cancel(false);
 +
 +        if (references.get() != 0)
 +        {
 +            throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
 +        }
 +
 +        synchronized (replaceLock)
 +        {
 +            boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = false;
 +
 +            if (replacedBy != null)
 +            {
 +                closeBf = replacedBy.bf != bf;
 +                closeSummary = replacedBy.indexSummary != indexSummary;
 +                closeFiles = replacedBy.dfile != dfile;
 +                // if the replacement sstablereader uses a different path, clean up our paths
 +                deleteFiles = !dfile.path.equals(replacedBy.dfile.path);
 +            }
 +
 +            if (replaces != null)
 +            {
 +                closeBf &= replaces.bf != bf;
 +                closeSummary &= replaces.indexSummary != indexSummary;
 +                closeFiles &= replaces.dfile != dfile;
 +                deleteFiles &= !dfile.path.equals(replaces.dfile.path);
 +            }
 +
 +            if (sharesBfWith != null)
 +            {
 +                closeBf &= sharesBfWith.bf != bf;
 +                closeSummary &= sharesBfWith.indexSummary != indexSummary;
 +                closeFiles &= sharesBfWith.dfile != dfile;
 +                deleteFiles &= !dfile.path.equals(sharesBfWith.dfile.path);
 +            }
 +
 +            boolean deleteAll = false;
 +            if (release && isCompacted.get())
 +            {
 +                assert replacedBy == null;
 +                if (replaces != null)
 +                {
 +                    replaces.replacedBy = null;
 +                    replaces.deletingTask = deletingTask;
 +                    replaces.markObsolete();
 +                }
 +                else
 +                {
 +                    deleteAll = true;
 +                }
 +            }
 +            else
 +            {
 +                if (replaces != null)
 +                    replaces.replacedBy = replacedBy;
 +                if (replacedBy != null)
 +                    replacedBy.replaces = replaces;
 +            }
 +
 +            scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll);
 +        }
 +    }
 +
 +    private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
 +    {
 +        if (references.get() != 0)
 +            throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
 +
 +        final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
 +        final OpOrder.Barrier barrier;
 +        if (cfs != null)
 +        {
 +            barrier = cfs.readOrdering.newBarrier();
 +            barrier.issue();
 +        }
 +        else
 +            barrier = null;
 +
 +        ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
 +        {
 +            public void run()
 +            {
 +                if (barrier != null)
 +                    barrier.await();
 +                if (closeBf)
 +                    bf.close();
 +                if (closeSummary)
 +                    indexSummary.close();
 +                if (closeFiles)
 +                {
 +                    ifile.cleanup();
 +                    dfile.cleanup();
 +                }
 +                if (runOnClose != null)
 +                    runOnClose.run();
 +                if (deleteAll)
 +                {
 +                    /**
 +                     * Do the OS a favour and suggest (using fadvice call) that we
 +                     * don't want to see pages of this SSTable in memory anymore.
 +                     *
 +                     * NOTE: We can't use madvice in java because it requires the address of
 +                     * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
 +                     */
 +                    dropPageCache();
 +                    deletingTask.run();
 +                }
 +                else if (deleteFiles)
 +                {
 +                    FileUtils.deleteWithConfirm(new File(dfile.path));
 +                    FileUtils.deleteWithConfirm(new File(ifile.path));
 +                }
 +            }
 +        });
 +    }
 +
 +    public boolean equals(Object that)
 +    {
 +        return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
 +    }
 +
 +    public int hashCode()
 +    {
 +        return this.descriptor.hashCode();
 +    }
 +
 +    public String getFilename()
 +    {
 +        return dfile.path;
 +    }
 +
 +    public String getIndexFilename()
 +    {
 +        return ifile.path;
 +    }
 +
 +    public void setTrackedBy(DataTracker tracker)
 +    {
 +        deletingTask.setTracker(tracker);
 +        // under normal operation we can do this at any time, but SSTR is also used outside C* proper,
 +        // e.g. by BulkLoader, which does not initialize the cache.  As a kludge, we set up the cache
 +        // here when we know we're being wired into the rest of the server infrastructure.
 +        keyCache = CacheService.instance.keyCache;
 +    }
 +
 +    private void load(ValidationMetadata validation) throws IOException
 +    {
 +        if (metadata.getBloomFilterFpChance() == 1.0)
 +        {
 +            // bf is disabled.
 +            load(false, true);
 +            bf = FilterFactory.AlwaysPresent;
 +        }
 +        else if (!components.contains(Component.FILTER) || validation == null)
 +        {
 +            // bf is enabled, but filter component is missing.
 +            load(true, true);
 +        }
 +        else if (validation.bloomFilterFPChance != metadata.getBloomFilterFpChance())
 +        {
 +            // bf fp chance in sstable metadata and it has changed since compaction.
 +            load(true, true);
 +        }
 +        else
 +        {
 +            // bf is enabled and fp chance matches the currently configured value.
 +            load(false, true);
 +            loadBloomFilter();
 +        }
 +    }
 +
 +    /**
 +     * Load bloom filter from Filter.db file.
 +     *
 +     * @throws IOException
 +     */
 +    private void loadBloomFilter() throws IOException
 +    {
 +        DataInputStream stream = null;
 +        try
 +        {
 +            stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER))));
 +            bf = FilterFactory.deserialize(stream, true);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(stream);
 +        }
 +    }
 +
 +    /**
 +     * Loads ifile, dfile and indexSummary, and optionally recreates the bloom filter.
 +     * @param saveSummaryIfCreated for bulk loading purposes, if the summary was absent and needed to be built, you can
 +     *                             avoid persisting it to disk by setting this to false
 +     */
 +    private void load(boolean recreateBloomFilter, boolean saveSummaryIfCreated) throws IOException
 +    {
 +        SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +        SegmentedFile.Builder dbuilder = compression
 +                ? SegmentedFile.getCompressedBuilder()
 +                : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +
 +        boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
 +        if (recreateBloomFilter || !summaryLoaded)
 +            buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
 +
 +        ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
 +        dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
 +        if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk
 +            saveSummary(ibuilder, dbuilder);
 +    }
 +
 +    /**
 +     * Build index summary(and optionally bloom filter) by reading through Index.db file.
 +     *
 +     * @param recreateBloomFilter true if recreate bloom filter
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @param summaryLoaded true if index summary is already loaded and not need to build again
 +     * @throws IOException
 +     */
 +    private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
 +    {
 +        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +
 +        try
 +        {
 +            long indexSize = primaryIndex.length();
 +            long histogramCount = sstableMetadata.estimatedRowSize.count();
 +            long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
 +                    ? histogramCount
 +                    : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
 +
 +            if (recreateBloomFilter)
 +                bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true);
 +
 +            IndexSummaryBuilder summaryBuilder = null;
 +            if (!summaryLoaded)
 +                summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel);
 +
 +            long indexPosition;
 +            RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata);
 +
 +            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
 +            {
 +                ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
 +                RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex, descriptor.version);
 +                DecoratedKey decoratedKey = partitioner.decorateKey(key);
 +                if (first == null)
 +                    first = decoratedKey;
 +                last = decoratedKey;
 +
 +                if (recreateBloomFilter)
 +                    bf.add(decoratedKey.getKey());
 +
 +                // if summary was already read from disk we don't want to re-populate it using primary index
 +                if (!summaryLoaded)
 +                {
 +                    summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
 +                    ibuilder.addPotentialBoundary(indexPosition);
 +                    dbuilder.addPotentialBoundary(indexEntry.position);
 +                }
 +            }
 +
 +            if (!summaryLoaded)
 +                indexSummary = summaryBuilder.build(partitioner);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(primaryIndex);
 +        }
 +
 +        first = getMinimalKey(first);
 +        last = getMinimalKey(last);
 +    }
 +
 +    /**
 +     * Load index summary from Summary.db file if it exists.
 +     *
 +     * if loaded index summary has different index interval from current value stored in schema,
 +     * then Summary.db file will be deleted and this returns false to rebuild summary.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @return true if index summary is loaded successfully from Summary.db file.
 +     */
 +    public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (!summariesFile.exists())
 +            return false;
 +
 +        DataInputStream iStream = null;
 +        try
 +        {
 +            iStream = new DataInputStream(new FileInputStream(summariesFile));
 +            indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel(), metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
 +            first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            ibuilder.deserializeBounds(iStream);
 +            dbuilder.deserializeBounds(iStream);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
 +            // corrupted; delete it and fall back to creating a new summary
 +            FileUtils.closeQuietly(iStream);
 +            // delete it and fall back to creating a new summary
 +            FileUtils.deleteWithConfirm(summariesFile);
 +            return false;
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(iStream);
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Save index summary to Summary.db file.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     */
 +    public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        saveSummary(ibuilder, dbuilder, indexSummary);
 +    }
 +
 +    private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (summariesFile.exists())
 +            FileUtils.deleteWithConfirm(summariesFile);
 +
 +        DataOutputStreamAndChannel oStream = null;
 +        try
 +        {
 +            oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile));
 +            IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
 +            ByteBufferUtil.writeWithLength(first.getKey(), oStream);
 +            ByteBufferUtil.writeWithLength(last.getKey(), oStream);
 +            ibuilder.serializeBounds(oStream);
 +            dbuilder.serializeBounds(oStream);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.debug("Cannot save SSTable Summary: ", e);
 +
 +            // corrupted hence delete it and let it load it now.
 +            if (summariesFile.exists())
 +                FileUtils.deleteWithConfirm(summariesFile);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(oStream);
 +        }
 +    }
 +
 +    public void setReplacedBy(SSTableReader replacement)
 +    {
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null;
 +            replacedBy = replacement;
 +            replacement.replaces = this;
 +            replacement.replaceLock = replaceLock;
 +        }
 +    }
 +
 +    /**
 +     * this is used to avoid closing the bloom filter multiple times when finishing an SSTableRewriter
 +     *
 +     * note that the reason we don't use replacedBy is that we are not yet actually replaced
 +     *
 +     * @param newReader
 +     */
 +    public void sharesBfWith(SSTableReader newReader)
 +    {
 +        assert openReason.equals(OpenReason.EARLY);
 +        this.sharesBfWith = newReader;
 +    }
 +
 +    public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
 +    {
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null;
 +
 +            if (newStart.compareTo(this.first) > 0)
 +            {
 +                if (newStart.compareTo(this.last) > 0)
 +                {
 +                    this.runOnClose = new Runnable()
 +                    {
 +                        public void run()
 +                        {
 +                            CLibrary.trySkipCache(dfile.path, 0, 0);
 +                            CLibrary.trySkipCache(ifile.path, 0, 0);
 +                            runOnClose.run();
 +                        }
 +                    };
 +                }
 +                else
 +                {
 +                    final long dataStart = getPosition(newStart, Operator.GE).position;
 +                    final long indexStart = getIndexScanPosition(newStart);
 +                    this.runOnClose = new Runnable()
 +                    {
 +                        public void run()
 +                        {
 +                            CLibrary.trySkipCache(dfile.path, 0, dataStart);
 +                            CLibrary.trySkipCache(ifile.path, 0, indexStart);
 +                            runOnClose.run();
 +                        }
 +                    };
 +                }
 +            }
 +
 +            SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile, dfile, indexSummary.readOnlyClone(), bf, maxDataAge, sstableMetadata,
 +                    openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
 +            replacement.readMeterSyncFuture = this.readMeterSyncFuture;
 +            replacement.readMeter = this.readMeter;
 +            replacement.first = this.last.compareTo(newStart) > 0 ? newStart : this.last;
 +            replacement.last = this.last;
 +            setReplacedBy(replacement);
 +            return replacement;
 +        }
 +    }
 +
 +    /**
 +     * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
 +     * be built at the target samplingLevel.  This (original) SSTableReader instance will be marked as replaced, have
 +     * its DeletingTask removed, and have its periodic read-meter sync task cancelled.
 +     * @param samplingLevel the desired sampling level for the index summary on the new SSTableReader
 +     * @return a new SSTableReader
 +     * @throws IOException
 +     */
 +    public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
 +    {
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null;
 +
 +            int minIndexInterval = metadata.getMinIndexInterval();
 +            int maxIndexInterval = metadata.getMaxIndexInterval();
 +            double effectiveInterval = indexSummary.getEffectiveIndexInterval();
 +
 +            IndexSummary newSummary;
 +            long oldSize = bytesOnDisk();
 +
 +            // We have to rebuild the summary from the on-disk primary index in three cases:
 +            // 1. The sampling level went up, so we need to read more entries off disk
 +            // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary
 +            //    at full sampling (and consequently at any other sampling level)
 +            // 3. The max_index_interval was lowered, forcing us to raise the sampling level
 +            if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval)
 +            {
 +                newSummary = buildSummaryAtLevel(samplingLevel);
 +            }
 +            else if (samplingLevel < indexSummary.getSamplingLevel())
 +            {
 +                // we can use the existing index summary to make a smaller one
 +                newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
 +
 +                SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +                SegmentedFile.Builder dbuilder = compression
 +                        ? SegmentedFile.getCompressedBuilder()
 +                        : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +                saveSummary(ibuilder, dbuilder, newSummary);
 +            }
 +            else
 +            {
 +                throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " +
 +                        "no adjustments to min/max_index_interval");
 +            }
 +
 +            long newSize = bytesOnDisk();
 +            StorageMetrics.load.inc(newSize - oldSize);
 +            parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
 +
 +            SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata,
 +                    openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
 +            replacement.readMeterSyncFuture = this.readMeterSyncFuture;
 +            replacement.readMeter = this.readMeter;
 +            replacement.first = this.first;
 +            replacement.last = this.last;
 +            setReplacedBy(replacement);
 +            return replacement;
 +        }
 +    }
 +
 +    private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException
 +    {
 +        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +        try
 +        {
 +            long indexSize = primaryIndex.length();
 +            IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel);
 +
 +            long indexPosition;
 +            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
 +            {
 +                summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
 +                RowIndexEntry.Serializer.skip(primaryIndex);
 +            }
 +
 +            return summaryBuilder.build(partitioner);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(primaryIndex);
 +        }
 +    }
 +
 +    public int getIndexSummarySamplingLevel()
 +    {
 +        return indexSummary.getSamplingLevel();
 +    }
 +
 +    public long getIndexSummaryOffHeapSize()
 +    {
 +        return indexSummary.getOffHeapSize();
 +    }
 +
 +    public int getMinIndexInterval()
 +    {
 +        return indexSummary.getMinIndexInterval();
 +    }
 +
 +    public double getEffectiveIndexInterval()
 +    {
 +        return indexSummary.getEffectiveIndexInterval();
 +    }
 +
 +    public void releaseSummary() throws IOException
 +    {
 +        indexSummary.close();
 +        indexSummary = null;
 +    }
 +
 +    private void validate()
 +    {
 +        if (this.first.compareTo(this.last) > 0)
 +            throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last));
 +    }
 +
 +    /**
 +     * Gets the position in the index file to start scanning to find the given key (at most indexInterval keys away,
 +     * modulo downsampling of the index summary).
 +     */
 +    public long getIndexScanPosition(RowPosition key)
 +    {
 +        return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary);
 +    }
 +
 +    protected static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
 +    {
 +        if (binarySearchResult == -1)
 +            return -1;
 +        else
 +            return referencedIndexSummary.getPosition(getIndexSummaryIndexFromBinarySearchResult(binarySearchResult));
 +    }
 +
 +    protected static int getIndexSummaryIndexFromBinarySearchResult(int binarySearchResult)
 +    {
 +        if (binarySearchResult < 0)
 +        {
 +            // binary search gives us the first index _greater_ than the key searched for,
 +            // i.e., its insertion position
 +            int greaterThan = (binarySearchResult + 1) * -1;
 +            if (greaterThan == 0)
 +                return -1;
 +            return greaterThan - 1;
 +        }
 +        else
 +        {
 +            return binarySearchResult;
 +        }
 +    }
 +
 +    /**
 +     * Returns the compression metadata for this sstable.
 +     * @throws IllegalStateException if the sstable is not compressed
 +     */
 +    public CompressionMetadata getCompressionMetadata()
 +    {
 +        if (!compression)
 +            throw new IllegalStateException(this + " is not compressed");
 +
 +        CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata();
 +
 +        //We need the parent cf metadata
 +        String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
 +        cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName));
 +
 +        return cmd;
 +    }
 +
 +    /**
 +     * Returns the amount of memory in bytes used off heap by the compression meta-data.
 +     * @return the amount of memory in bytes used off heap by the compression meta-data
 +     */
 +    public long getCompressionMetadataOffHeapSize()
 +    {
 +        if (!compression)
 +            return 0;
 +
 +        return getCompressionMetadata().offHeapSize();
 +    }
 +
 +    /**
 +     * For testing purposes only.
 +     */
 +    public void forceFilterFailures()
 +    {
 +        bf = FilterFactory.AlwaysPresent;
 +    }
 +
 +    public IFilter getBloomFilter()
 +    {
 +        return bf;
 +    }
 +
 +    public long getBloomFilterSerializedSize()
 +    {
 +        return bf.serializedSize();
 +    }
 +
 +    /**
 +     * Returns the amount of memory in bytes used off heap by the bloom filter.
 +     * @return the amount of memory in bytes used off heap by the bloom filter
 +     */
 +    public long getBloomFilterOffHeapSize()
 +    {
 +        return bf.offHeapSize();
 +    }
 +
 +    /**
 +     * @return An estimate of the number of keys in this SSTable based on the index summary.
 +     */
 +    public long estimatedKeys()
 +    {
 +        return indexSummary.getEstimatedKeyCount();
 +    }
 +
 +    /**
 +     * @param ranges
 +     * @return An estimate of the number of keys for given ranges in this SSTable.
 +     */
 +    public long estimatedKeysForRanges(Collection<Range<Token>> ranges)
 +    {
 +        long sampleKeyCount = 0;
 +        List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges);
 +        for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes)
 +            sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1);
 +
 +        // adjust for the current sampling level: (BSL / SL) * index_interval_at_full_sampling
 +        long estimatedKeys = sampleKeyCount * (Downsampling.BASE_SAMPLING_LEVEL * indexSummary.getMinIndexInterval()) / indexSummary.getSamplingLevel();
 +        return Math.max(1, estimatedKeys);
 +    }
 +
 +    /**
 +     * Returns the number of entries in the IndexSummary.  At full sampling, this is approximately 1/INDEX_INTERVALth of
 +     * the keys in this SSTable.
 +     */
 +    public int getIndexSummarySize()
 +    {
 +        return indexSummary.size();
 +    }
 +
 +    /**
 +     * Returns the approximate number of entries the IndexSummary would contain if it were at full sampling.
 +     */
 +    public int getMaxIndexSummarySize()
 +    {
 +        return indexSummary.getMaxNumberOfEntries();
 +    }
 +
 +    /**
 +     * Returns the key for the index summary entry at `index`.
 +     */
 +    public byte[] getIndexSummaryKey(int index)
 +    {
 +        return indexSummary.getKey(index);
 +    }
 +
 +    private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges)
 +    {
 +        // use the index to determine a minimal section for each range
 +        List<Pair<Integer,Integer>> positions = new ArrayList<>();
 +
 +        for (Range<Token> range : Range.normalize(ranges))
 +        {
 +            RowPosition leftPosition = range.left.maxKeyBound();
 +            RowPosition rightPosition = range.right.maxKeyBound();
 +
 +            int left = summary.binarySearch(leftPosition);
 +            if (left < 0)
 +                left = (left + 1) * -1;
 +            else
 +                // left range are start exclusive
 +                left = left + 1;
 +            if (left == summary.size())
 +                // left is past the end of the sampling
 +                continue;
 +
 +            int right = Range.isWrapAround(range.left, range.right)
 +                    ? summary.size() - 1
 +                    : summary.binarySearch(rightPosition);
 +            if (right < 0)
 +            {
 +                // range are end inclusive so we use the previous index from what binarySearch give us
 +                // since that will be the last index we will return
 +                right = (right + 1) * -1;
 +                if (right == 0)
 +                    // Means the first key is already stricly greater that the right bound
 +                    continue;
 +                right--;
 +            }
 +
 +            if (left > right)
 +                // empty range
 +                continue;
 +            positions.add(Pair.create(left, right));
 +        }
 +        return positions;
 +    }
 +
 +    public Iterable<DecoratedKey> getKeySamples(final Range<Token> range)
 +    {
 +        final List<Pair<Integer, Integer>> indexRanges = getSampleIndexesForRanges(indexSummary, Collections.singletonList(range));
 +
 +        if (indexRanges.isEmpty())
 +            return Collections.emptyList();
 +
 +        return new Iterable<DecoratedKey>()
 +        {
 +            public Iterator<DecoratedKey> iterator()
 +            {
 +                return new Iterator<DecoratedKey>()
 +                {
 +                    private Iterator<Pair<Integer, Integer>> rangeIter = indexRanges.iterator();
 +                    private Pair<Integer, Integer> current;
 +                    private int idx;
 +
 +                    public boolean hasNext()
 +                    {
 +                        if (current == null || idx > current.right)
 +                        {
 +                            if (rangeIter.hasNext())
 +                            {
 +                                current = rangeIter.next();
 +                                idx = current.left;
 +                                return true;
 +                            }
 +                            return false;
 +                        }
 +
 +                        return true;
 +                    }
 +
 +                    public DecoratedKey next()
 +                    {
 +                        byte[] bytes = indexSummary.getKey(idx++);
 +                        return partitioner.decorateKey(ByteBuffer.wrap(bytes));
 +                    }
 +
 +                    public void remove()
 +                    {
 +                        throw new UnsupportedOperationException();
 +                    }
 +                };
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Determine the minimal set of sections that can be extracted from this SSTable to cover the given ranges.
 +     * @return A sorted list of (offset,end) pairs that cover the given ranges in the datafile for this SSTable.
 +     */
 +    public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range<Token>> ranges)
 +    {
 +        // use the index to determine a minimal section for each range
 +        List<Pair<Long,Long>> positions = new ArrayList<>();
 +        for (Range<Token> range : Range.normalize(ranges))
 +        {
 +            AbstractBounds<RowPosition> keyRange = range.toRowBounds();
 +            RowIndexEntry idxLeft = getPosition(keyRange.left, Operator.GT);
 +            long left = idxLeft == null ? -1 : idxLeft.position;
 +            if (left == -1)
 +                // left is past the end of the file
 +                continue;
 +            RowIndexEntry idxRight = getPosition(keyRange.right, Operator.GT);
 +            long right = idxRight == null ? -1 : idxRight.position;
 +            if (right == -1 || Range.isWrapAround(range.left, range.right))
 +                // right is past the end of the file, or it wraps
 +                right = uncompressedLength();
 +            if (left == right)
 +                // empty range
 +                continue;
 +            positions.add(Pair.create(left, right));
 +        }
 +        return positions;
 +    }
 +
 +    public void invalidateCacheKey(DecoratedKey key)
 +    {
 +        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
 +        keyCache.remove(cacheKey);
 +    }
 +
 +    public void cacheKey(DecoratedKey key, RowIndexEntry info)
 +    {
 +        CachingOptions caching = metadata.getCaching();
 +
 +        if (!caching.keyCache.isEnabled()
 +                || keyCache == null
 +                || keyCache.getCapacity() == 0)
 +        {
 +            return;
 +        }
 +
 +        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
 +        logger.trace("Adding cache entry for {} -> {}", cacheKey, info);
 +        keyCache.put(cacheKey, info);
 +    }
 +
 +    public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
 +    {
 +        return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.getKey()), updateStats);
 +    }
 +
 +    protected RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats)
 +    {
 +        if (keyCache != null && keyCache.getCapacity() > 0) {
 +            if (updateStats)
 +            {
 +                RowIndexEntry cachedEntry = keyCache.get(unifiedKey);
 +                keyCacheRequest.incrementAndGet();
 +                if (cachedEntry != null)
 +                    keyCacheHit.incrementAndGet();
 +                return cachedEntry;
 +            }
 +            else
 +            {
 +                return keyCache.getInternal(unifiedKey);
 +            }
 +        }
 +        return null;
 +    }
 +
 +    /**
 +     * Get position updating key cache and stats.
 +     * @see #getPosition(org.apache.cassandra.db.RowPosition, SSTableReader.Operator, boolean)
 +     */
 +    public RowIndexEntry getPosition(RowPosition key, Operator op)
 +    {
 +        return getPosition(key, op, true);
 +    }
 +
 +    /**
 +     * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
 +     * allow key selection by token bounds but only if op != * EQ
 +     * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
 +     * @param updateCacheAndStats true if updating stats and cache
 +     * @return The index entry corresponding to the key, or null if the key is not present
 +     */
 +    public abstract RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats);
 +
 +    //Corresponds to a name column
 +    public abstract OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns);
 +    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry);
 +
 +    //Corresponds to a slice query
 +    public abstract OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse);
 +    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry);
 +
 +    /**
 +     * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
 +     */
 +    public DecoratedKey firstKeyBeyond(RowPosition token)
 +    {
 +        long sampledPosition = getIndexScanPosition(token);
 +        if (sampledPosition == -1)
 +            sampledPosition = 0;
 +
 +        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
 +        while (segments.hasNext())
 +        {
 +            FileDataInput in = segments.next();
 +            try
 +            {
 +                while (!in.isEOF())
 +                {
 +                    ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
 +                    DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
 +                    if (indexDecoratedKey.compareTo(token) > 0)
 +                        return indexDecoratedKey;
 +
 +                    RowIndexEntry.Serializer.skip(in);
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                markSuspect();
 +                throw new CorruptSSTableException(e, in.getPath());
 +            }
 +            finally
 +            {
 +                FileUtils.closeQuietly(in);
 +            }
 +        }
 +
 +        return null;
 +    }
 +
 +    /**
 +     * @return The length in bytes of the data for this SSTable. For
 +     * compressed files, this is not the same thing as the on disk size (see
 +     * onDiskLength())
 +     */
 +    public long uncompressedLength()
 +    {
 +        return dfile.length;
 +    }
 +
 +    /**
 +     * @return The length in bytes of the on disk size for this SSTable. For
 +     * compressed files, this is not the same thing as the data length (see
 +     * length())
 +     */
 +    public long onDiskLength()
 +    {
 +        return dfile.onDiskLength;
 +    }
 +
 +    public boolean acquireReference()
 +    {
 +        while (true)
 +        {
 +            int n = references.get();
 +            if (n <= 0)
 +                return false;
 +            if (references.compareAndSet(n, n + 1))
 +                return true;
 +        }
 +    }
 +
 +    @VisibleForTesting
 +    public int referenceCount()
 +    {
 +        return references.get();
 +    }
 +
 +    /**
 +     * Release reference to this SSTableReader.
 +     * If there is no one referring to this SSTable, and is marked as compacted,
 +     * all resources are cleaned up and files are deleted eventually.
 +     */
 +    public void releaseReference()
 +    {
 +        if (references.decrementAndGet() == 0)
 +            tidy(true);
 +        assert references.get() >= 0 : "Reference counter " +  references.get() + " for " + dfile.path;
 +    }
 +
 +    /**
 +     * Mark the sstable as obsolete, i.e., compacted into newer sstables.
 +     *
 +     * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere
 +     * except for threads holding a reference.
 +     *
 +     * @return true if the this is the first time the file was marked obsolete.  Calling this
 +     * multiple times is usually buggy (see exceptions in DataTracker.unmarkCompacting and removeOldSSTablesSize).
 +     */
 +    public boolean markObsolete()
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Marking {} compacted", getFilename());
 +
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null : getFilename();
 +        }
 +        return !isCompacted.getAndSet(true);
 +    }
 +
 +    public boolean isMarkedCompacted()
 +    {
 +        return isCompacted.get();
 +    }
 +
 +    public void markSuspect()
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Marking {} as a suspect for blacklisting.", getFilename());
 +
 +        isSuspect.getAndSet(true);
 +    }
 +
 +    public boolean isMarkedSuspect()
 +    {
 +        return isSuspect.get();
 +    }
 +
 +
 +    /**
 +     * I/O SSTableScanner
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
-     public ICompactionScanner getScanner()
++    public ISSTableScanner getScanner()
 +    {
 +        return getScanner((RateLimiter) null);
 +    }
 +
-     public ICompactionScanner getScanner(RateLimiter limiter)
++    public ISSTableScanner getScanner(RateLimiter limiter)
 +    {
 +        return getScanner(DataRange.allData(partitioner), limiter);
 +    }
 +
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
-     public ICompactionScanner getScanner(DataRange dataRange)
++    public ISSTableScanner getScanner(DataRange dataRange)
 +    {
 +        return getScanner(dataRange, null);
 +    }
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined range of tokens.
 +     *
 +     * @param range the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
-     public ICompactionScanner getScanner(Range<Token> range, RateLimiter limiter)
++    public ISSTableScanner getScanner(Range<Token> range, RateLimiter limiter)
 +    {
 +        if (range == null)
 +            return getScanner(limiter);
 +        return getScanner(Collections.singletonList(range), limiter);
 +    }
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
 +     *
 +     * @param ranges the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
-     public abstract ICompactionScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter);
++    public abstract ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter);
 +
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
-     public abstract ICompactionScanner getScanner(DataRange dataRange, RateLimiter limiter);
++    public abstract ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter);
 +
 +
 +
 +    public FileDataInput getFileDataInput(long position)
 +    {
 +        return dfile.getSegment(position);
 +    }
 +
 +    /**
 +     * Tests if the sstable contains data newer than the given age param (in localhost currentMilli time).
 +     * This works in conjunction with maxDataAge which is an upper bound on the create of data in this sstable.
 +     * @param age The age to compare the maxDataAre of this sstable. Measured in millisec since epoc on this host
 +     * @return True iff this sstable contains data that's newer than the given age parameter.
 +     */
 +    public boolean newSince(long age)
 +    {
 +        return maxDataAge > age;
 +    }
 +
 +    public void createLinks(String snapshotDirectoryPath)
 +    {
 +        for (Component component : components)
 +        {
 +            File sourceFile = new File(descriptor.filenameFor(component));
 +            File targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
 +            FileUtils.createHardLink(sourceFile, targetLink);
 +        }
 +    }
 +
 +    public boolean isRepaired()
 +    {
 +        return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
 +    }
 +
 +    public SSTableReader getCurrentReplacement()
 +    {
 +        synchronized (replaceLock)
 +        {
 +            SSTableReader cur = this, next = replacedBy;
 +            while (next != null)
 +            {
 +                cur = next;
 +                next = next.replacedBy;
 +            }
 +            return cur;
 +        }
 +    }
 +
 +    /**
 +     * TODO: Move someplace reusable
 +     */
 +    public abstract static class Operator
 +    {
 +        public static final Operator EQ = new Equals();
 +        public static final Operator GE = new GreaterThanOrEqualTo();
 +        public static final Operator GT = new GreaterThan();
 +
 +        /**
 +         * @param comparison The result of a call to compare/compareTo, with the desired field on the rhs.
 +         * @return less than 0 if the operator cannot match forward, 0 if it matches, greater than 0 if it might match forward.
 +         */
 +        public abstract int apply(int comparison);
 +
 +        final static class Equals extends Operator
 +        {
 +            public int apply(int comparison) { return -comparison; }
 +        }
 +
 +        final static class GreaterThanOrEqualTo extends Operator
 +        {
 +            public int apply(int comparison) { return comparison >= 0 ? 0 : -comparison; }
 +        }
 +
 +        final static class GreaterThan extends Operator
 +        {
 +            public int apply(int comparison) { return comparison > 0 ? 0 : 1; }
 +        }
 +    }
 +
 +    public long getBloomFilterFalsePositiveCount()
 +    {
 +        return bloomFilterTracker.getFalsePositiveCount();
 +    }
 +
 +    public long getRecentBloomFilterFalsePositiveCount()
 +    {
 +        return bloomFilterTracker.getRecentFalsePositiveCount();
 +    }
 +
 +    public long getBloomFilterTruePositiveCount()
 +    {
 +        return bloomFilterTracker.getTruePositiveCount();
 +    }
 +
 +    public long getRecentBloomFilterTruePositiveCount()
 +    {
 +        return bloomFilterTracker.getRecentTruePositiveCount();
 +    }
 +
 +    public InstrumentingCache<KeyCacheKey, RowIndexEntry> getKeyCache()
 +    {
 +        return keyCache;
 +    }
 +
 +    public EstimatedHistogram getEstimatedRowSize()
 +    {
 +        return sstableMetadata.estimatedRowSize;
 +    }
 +
 +    public EstimatedHistogram getEstimatedColumnCount()
 +    {
 +        return sstableMetadata.estimatedColumnCount;
 +    }
 +
 +    public double getEstimatedDroppableTombstoneRatio(int gcBefore)
 +    {
 +        return sstableMetadata.getEstimatedDroppableTombstoneRatio(gcBefore);
 +    }
 +
 +    public double getDroppableTombstonesBefore(int gcBefore)
 +    {
 +        return sstableMetadata.getDroppableTombstonesBefore(gcBefore);
 +    }
 +
 +    public double getCompressionRatio()
 +    {
 +        return sstableMetadata.compressionRatio;
 +    }
 +
 +    public ReplayPosition getReplayPosition()
 +    {
 +        return sstableMetadata.replayPosition;
 +    }
 +
 +    public long getMinTimestamp()
 +    {
 +        return sstableMetadata.minTimestamp;
 +    }
 +
 +    public long getMaxTimestamp()
 +    {
 +        return sstableMetadata.maxTimestamp;
 +    }
 +
 +    public Set<Integer> getAncestors()
 +    {
 +        try
 +        {
 +            CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION);
 +            return compactionMetadata.ancestors;
 +        }
 +        catch (IOException e)
 +        {
 +            SSTableReader.logOpenException(descriptor, e);
 +            return Collections.emptySet();
 +        }
 +    }
 +
 +    public int getSSTableLevel()
 +    {
 +        return sstableMetadata.sstableLevel;
 +    }
 +
 +    /**
 +     * Reloads the sstable metadata from disk.
 +     *
 +     * Called after level is changed on sstable, for example if the sstable is dropped to L0
 +     *
 +     * Might be possible to remove in future versions
 +     *
 +     * @throws IOException
 +     */
 +    public void reloadSSTableMetadata() throws IOException
 +    {
 +        this.sstableMetadata = (StatsMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.STATS);
 +    }
 +
 +    public StatsMetadata getSSTableMetadata()
 +    {
 +        return sstableMetadata;
 +    }
 +
 +    public RandomAccessReader openDataReader(RateLimiter limiter)
 +    {
 +        assert limiter != null;
 +        return compression
 +                ? CompressedThrottledReader.open(getFilename(), getCompressionMetadata(), limiter)
 +                : ThrottledReader.open(new File(getFilename()), limiter);
 +    }
 +
 +    public RandomAccessReader openDataReader()
 +    {
 +        return compression
 +                ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata())
 +                : RandomAccessReader.open(new File(getFilename()));
 +    }
 +
 +    public RandomAccessReader openIndexReader()
 +    {
 +        return RandomAccessReader.open(new File(getIndexFilename()));
 +    }
 +
 +    /**
 +     * @param component component to get timestamp.
 +     * @return last modified time for given component. 0 if given component does not exist or IO error occurs.
 +     */
 +    public long getCreationTimeFor(Component component)
 +    {
 +        return new File(descriptor.filenameFor(component)).lastModified();
 +    }
 +
 +    /**
 +     * @return Number of key cache hit
 +     */
 +    public long getKeyCacheHit()
 +    {
 +        return keyCacheHit.get();
 +    }
 +
 +    /**
 +     * @return Number of key cache request
 +     */
 +    public long getKeyCacheRequest()
 +    {
 +        return keyCacheRequest.get();
 +    }
 +
 +    /**
 +     * @param sstables
 +     * @return true if all desired references were acquired.  Otherwise, it will unreference any partial acquisition, and return false.
 +     */
 +    public static boolean acquireReferences(Iterable<SSTableReader> sstables)
 +    {
 +        SSTableReader failed = null;
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (!sstable.acquireReference())
 +            {
 +                failed = sstable;
 +                break;
 +            }
 +        }
 +
 +        if (failed == null)
 +            return true;
 +
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (sstable == failed)
 +                break;
 +            sstable.releaseReference();
 +        }
 +        return false;
 +    }
 +
 +    public static void releaseReferences(Iterable<SSTableReader> sstables)
 +    {
 +        for (SSTableReader sstable : sstables)
 +        {
 +            sstable.releaseReference();
 +        }
 +    }
 +
 +    private void dropPageCache()
 +    {
 +        dropPageCache(dfile.path);
 +        dropPageCache(ifile.path);
 +    }
 +
 +    private void dropPageCache(String filePath)
 +    {
 +        RandomAccessFile file = null;
 +
 +        try
 +        {
 +            file = new RandomAccessFile(filePath, "r");
 +
 +            int fd = CLibrary.getfd(file.getFD());
 +
 +            if (fd > 0)
 +            {
 +                if (logger.isDebugEnabled())
 +                    logger.debug(String.format("Dropping page cache of file %s.", filePath));
 +
 +                CLibrary.trySkipCache(fd, 0, 0);
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            // we don't care if cache cleanup fails
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(file);
 +        }
 +    }
 +
 +    /**
 +     * Increment the total row read count and read rate for this SSTable.  This should not be incremented for range
 +     * slice queries, row cache hits, or non-query reads, like compaction.
 +     */
 +    public void incrementReadCount()
 +    {
 +        if (readMeter != null)
 +            readMeter.mark();
 +    }
 +
-     protected class EmptyCompactionScanner implements ICompactionScanner
-     {
-         private final String filename;
- 
-         public EmptyCompactionScanner(String filename)
-         {
-             this.filename = filename;
-         }
- 
-         public long getLengthInBytes()
-         {
-             return 0;
-         }
- 
-         public long getCurrentPosition()
-         {
-             return 0;
-         }
- 
-         public String getBackingFiles()
-         {
-             return filename;
-         }
- 
-         public boolean hasNext()
-         {
-             return false;
-         }
- 
-         public OnDiskAtomIterator next()
-         {
-             return null;
-         }
- 
-         public void close() throws IOException { }
- 
-         public void remove() { }
-     }
- 
 +    public static class SizeComparator implements Comparator<SSTableReader>
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
 +        }
 +    }
 +
 +    public static abstract class Factory
 +    {
 +        public abstract SSTableReader open(final Descriptor descriptor,
 +                                           Set<Component> components,
 +                                           CFMetaData metadata,
 +                                           IPartitioner partitioner,
 +                                           Long maxDataAge,
 +                                           StatsMetadata sstableMetadata,
 +                                           OpenReason openReason);
 +
 +    }
 +}