You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/03/04 16:18:13 UTC

[1/4] cassandra git commit: IndexSummaryBuilder utilises offheap memory, and shares data between each IndexSummary opened from it

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 3c3fefa04 -> f3c0e11e2
  refs/heads/trunk 20b62de80 -> e473ce066


IndexSummaryBuilder utilises offheap memory, and shares data between
each IndexSummary opened from it

patch by benedict; reviewed by ariel for CASSANDRA-8757


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f3c0e11e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f3c0e11e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f3c0e11e

Branch: refs/heads/cassandra-2.1
Commit: f3c0e11e2ddb0b0666e7723a3fca005707b778ea
Parents: 3c3fefa
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Mar 4 15:07:32 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Mar 4 15:07:32 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../cassandra/io/sstable/IndexSummary.java      | 113 ++++++++++-----
 .../io/sstable/IndexSummaryBuilder.java         | 144 +++++++++----------
 .../cassandra/io/sstable/SSTableReader.java     |  69 ++++-----
 .../cassandra/io/sstable/SSTableWriter.java     |   4 +-
 .../cassandra/io/util/AbstractDataOutput.java   |   4 +-
 .../cassandra/io/util/DataOutputPlus.java       |   3 +-
 .../org/apache/cassandra/io/util/Memory.java    |  40 +++++-
 .../cassandra/io/util/SafeMemoryWriter.java     | 136 ++++++++++++++++++
 .../apache/cassandra/utils/concurrent/Ref.java  |   2 +-
 .../concurrent/WrappedSharedCloseable.java      |  14 +-
 .../cassandra/io/sstable/IndexSummaryTest.java  |  54 +++----
 .../cassandra/io/util/DataOutputTest.java       |  11 ++
 13 files changed, 411 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6133536..3b373ae 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,8 @@
  * cqlsh: Fix keys() and full() collection indexes in DESCRIBE output
    (CASSANDRA-8154)
  * Show progress of streaming in nodetool netstats (CASSANDRA-8886)
+ * IndexSummaryBuilder utilises offheap memory, and shares data between
+   each IndexSummary opened from it (CASSANDRA-8757)
 Merged from 2.0:
  * Add offline tool to relevel sstables (CASSANDRA-8301)
  * Preserve stream ID for more protocol errors (CASSANDRA-8848)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index 0cde124..bad50b4 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.io.sstable;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
-import org.apache.cassandra.cache.RefCountedMemory;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.IPartitioner;
@@ -54,9 +54,16 @@ public class IndexSummary extends WrappedSharedCloseable
     private final int minIndexInterval;
 
     private final IPartitioner partitioner;
-    private final int summarySize;
     private final int sizeAtFullSampling;
-    private final Memory bytes;
+    // we permit the memory to span a range larger than we use,
+    // so we have an accompanying count and length for each part
+    // we split our data into two ranges: offsets (indexing into entries),
+    // and entries containing the summary data
+    private final Memory offsets;
+    private final int offsetCount;
+    // entries is a list of (partition key, index file offset) pairs
+    private final Memory entries;
+    private final long entriesLength;
 
     /**
      * A value between 1 and BASE_SAMPLING_LEVEL that represents how many of the original
@@ -66,15 +73,18 @@ public class IndexSummary extends WrappedSharedCloseable
      */
     private final int samplingLevel;
 
-    public IndexSummary(IPartitioner partitioner, Memory bytes, int summarySize, int sizeAtFullSampling,
-                        int minIndexInterval, int samplingLevel)
+    public IndexSummary(IPartitioner partitioner, Memory offsets, int offsetCount, Memory entries, long entriesLength,
+                        int sizeAtFullSampling, int minIndexInterval, int samplingLevel)
     {
-        super(bytes);
+        super(new Memory[] { offsets, entries });
+        assert offsets.getInt(0) == 0;
         this.partitioner = partitioner;
         this.minIndexInterval = minIndexInterval;
-        this.summarySize = summarySize;
+        this.offsetCount = offsetCount;
+        this.entriesLength = entriesLength;
         this.sizeAtFullSampling = sizeAtFullSampling;
-        this.bytes = bytes;
+        this.offsets = offsets;
+        this.entries = entries;
         this.samplingLevel = samplingLevel;
     }
 
@@ -83,9 +93,11 @@ public class IndexSummary extends WrappedSharedCloseable
         super(copy);
         this.partitioner = copy.partitioner;
         this.minIndexInterval = copy.minIndexInterval;
-        this.summarySize = copy.summarySize;
+        this.offsetCount = copy.offsetCount;
+        this.entriesLength = copy.entriesLength;
         this.sizeAtFullSampling = copy.sizeAtFullSampling;
-        this.bytes = copy.bytes;
+        this.offsets = copy.offsets;
+        this.entries = copy.entries;
         this.samplingLevel = copy.samplingLevel;
     }
 
@@ -93,7 +105,7 @@ public class IndexSummary extends WrappedSharedCloseable
     // Harmony's Collections implementation
     public int binarySearch(RowPosition key)
     {
-        int low = 0, mid = summarySize, high = mid - 1, result = -1;
+        int low = 0, mid = offsetCount, high = mid - 1, result = -1;
         while (low <= high)
         {
             mid = (low + high) >> 1;
@@ -123,7 +135,7 @@ public class IndexSummary extends WrappedSharedCloseable
     public int getPositionInSummary(int index)
     {
         // The first section of bytes holds a four-byte position for each entry in the summary, so just multiply by 4.
-        return bytes.getInt(index << 2);
+        return offsets.getInt(index << 2);
     }
 
     public byte[] getKey(int index)
@@ -131,27 +143,23 @@ public class IndexSummary extends WrappedSharedCloseable
         long start = getPositionInSummary(index);
         int keySize = (int) (calculateEnd(index) - start - 8L);
         byte[] key = new byte[keySize];
-        bytes.getBytes(start, key, 0, keySize);
+        entries.getBytes(start, key, 0, keySize);
         return key;
     }
 
     public long getPosition(int index)
     {
-        return bytes.getLong(calculateEnd(index) - 8);
+        return entries.getLong(calculateEnd(index) - 8);
     }
 
-    public byte[] getEntry(int index)
+    public long getEndInSummary(int index)
     {
-        long start = getPositionInSummary(index);
-        long end = calculateEnd(index);
-        byte[] entry = new byte[(int)(end - start)];
-        bytes.getBytes(start, entry, 0, (int) (end - start));
-        return entry;
+        return calculateEnd(index);
     }
 
     private long calculateEnd(int index)
     {
-        return index == (summarySize - 1) ? bytes.size() : getPositionInSummary(index + 1);
+        return index == (offsetCount - 1) ? entriesLength : getPositionInSummary(index + 1);
     }
 
     public int getMinIndexInterval()
@@ -174,7 +182,7 @@ public class IndexSummary extends WrappedSharedCloseable
 
     public int size()
     {
-        return summarySize;
+        return offsetCount;
     }
 
     public int getSamplingLevel()
@@ -192,12 +200,27 @@ public class IndexSummary extends WrappedSharedCloseable
     }
 
     /**
-     * Returns the amount of off-heap memory used for this summary.
+     * Returns the amount of off-heap memory used for the entries portion of this summary.
      * @return size in bytes
      */
-    public long getOffHeapSize()
+    long getEntriesLength()
+    {
+        return entriesLength;
+    }
+
+    Memory getOffsets()
+    {
+        return offsets;
+    }
+
+    Memory getEntries()
+    {
+        return entries;
+    }
+
+    long getOffHeapSize()
     {
-        return bytes.size();
+        return offsetCount * 4 + entriesLength;
     }
 
     /**
@@ -224,14 +247,29 @@ public class IndexSummary extends WrappedSharedCloseable
         public void serialize(IndexSummary t, DataOutputPlus out, boolean withSamplingLevel) throws IOException
         {
             out.writeInt(t.minIndexInterval);
-            out.writeInt(t.summarySize);
-            out.writeLong(t.bytes.size());
+            out.writeInt(t.offsetCount);
+            out.writeLong(t.getOffHeapSize());
             if (withSamplingLevel)
             {
                 out.writeInt(t.samplingLevel);
                 out.writeInt(t.sizeAtFullSampling);
             }
-            out.write(t.bytes);
+            // our on-disk representation treats the offsets and the summary data as one contiguous structure,
+            // in which the offsets are based from the start of the structure. i.e., if the offsets occupy
+            // X bytes, the value of the first offset will be X. In memory we split the two regions up, so that
+            // the summary values are indexed from zero, so we apply a correction to the offsets when de/serializing.
+            // In this case adding X to each of the offsets.
+            int baseOffset = t.offsetCount * 4;
+            for (int i = 0 ; i < t.offsetCount ; i++)
+            {
+                int offset = t.offsets.getInt(i * 4) + baseOffset;
+                // our serialization format for this file uses native byte order, so if this is different to the
+                // default Java serialization order (BIG_ENDIAN) we have to reverse our bytes
+                if (ByteOrder.nativeOrder() != ByteOrder.BIG_ENDIAN)
+                    offset = Integer.reverseBytes(offset);
+                out.writeInt(offset);
+            }
+            out.write(t.entries, 0, t.entriesLength);
         }
 
         public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel, int expectedMinIndexInterval, int maxIndexInterval) throws IOException
@@ -243,7 +281,7 @@ public class IndexSummary extends WrappedSharedCloseable
                                                     minIndexInterval, expectedMinIndexInterval));
             }
 
-            int summarySize = in.readInt();
+            int offsetCount = in.readInt();
             long offheapSize = in.readLong();
             int samplingLevel, fullSamplingSummarySize;
             if (haveSamplingLevel)
@@ -254,7 +292,7 @@ public class IndexSummary extends WrappedSharedCloseable
             else
             {
                 samplingLevel = BASE_SAMPLING_LEVEL;
-                fullSamplingSummarySize = summarySize;
+                fullSamplingSummarySize = offsetCount;
             }
 
             int effectiveIndexInterval = (int) Math.ceil((BASE_SAMPLING_LEVEL / (double) samplingLevel) * minIndexInterval);
@@ -264,9 +302,18 @@ public class IndexSummary extends WrappedSharedCloseable
                                                     " the current max index interval (%d)", effectiveIndexInterval, maxIndexInterval));
             }
 
-            RefCountedMemory memory = new RefCountedMemory(offheapSize);
-            FBUtilities.copy(in, new MemoryOutputStream(memory), offheapSize);
-            return new IndexSummary(partitioner, memory, summarySize, fullSamplingSummarySize, minIndexInterval, samplingLevel);
+            Memory offsets = Memory.allocate(offsetCount * 4);
+            Memory entries = Memory.allocate(offheapSize - offsets.size());
+            FBUtilities.copy(in, new MemoryOutputStream(offsets), offsets.size());
+            FBUtilities.copy(in, new MemoryOutputStream(entries), entries.size());
+            // our on-disk representation treats the offsets and the summary data as one contiguous structure,
+            // in which the offsets are based from the start of the structure. i.e., if the offsets occupy
+            // X bytes, the value of the first offset will be X. In memory we split the two regions up, so that
+            // the summary values are indexed from zero, so we apply a correction to the offsets when de/serializing.
+            // In this case subtracting X from each of the offsets.
+            for (int i = 0 ; i < offsets.size() ; i += 4)
+                offsets.setInt(i, (int) (offsets.getInt(i) - offsets.size()));
+            return new IndexSummary(partitioner, offsets, offsetCount, entries, entries.size(), fullSamplingSummarySize, minIndexInterval, samplingLevel);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index 3b93b31..54e8dd2 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -17,36 +17,33 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.nio.ByteOrder;
 import java.util.Map;
 import java.util.TreeMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cache.RefCountedMemory;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.Memory;
+import org.apache.cassandra.io.util.SafeMemoryWriter;
 
 import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
-import static org.apache.cassandra.io.sstable.SSTable.getMinimalKey;
 
-public class IndexSummaryBuilder
+public class IndexSummaryBuilder implements AutoCloseable
 {
     private static final Logger logger = LoggerFactory.getLogger(IndexSummaryBuilder.class);
 
-    private final ArrayList<Long> positions;
-    private final ArrayList<DecoratedKey> keys;
+    // the offset in the keys memory region to look for a given summary boundary
+    private final SafeMemoryWriter offsets;
+    private final SafeMemoryWriter entries;
+
     private final int minIndexInterval;
     private final int samplingLevel;
     private final int[] startPoints;
     private long keysWritten = 0;
     private long indexIntervalMatches = 0;
-    private long offheapSize = 0;
     private long nextSamplePosition;
 
     // for each ReadableBoundary, we map its dataLength property to itself, permitting us to lookup the
@@ -75,11 +72,15 @@ public class IndexSummaryBuilder
         final DecoratedKey lastKey;
         final long indexLength;
         final long dataLength;
-        public ReadableBoundary(DecoratedKey lastKey, long indexLength, long dataLength)
+        final int summaryCount;
+        final long entriesLength;
+        public ReadableBoundary(DecoratedKey lastKey, long indexLength, long dataLength, int summaryCount, long entriesLength)
         {
             this.lastKey = lastKey;
             this.indexLength = indexLength;
             this.dataLength = dataLength;
+            this.summaryCount = summaryCount;
+            this.entriesLength = entriesLength;
         }
     }
 
@@ -105,10 +106,9 @@ public class IndexSummaryBuilder
         }
 
         // for initializing data structures, adjust our estimates based on the sampling level
-        maxExpectedEntries = (maxExpectedEntries * samplingLevel) / BASE_SAMPLING_LEVEL;
-        positions = new ArrayList<>((int)maxExpectedEntries);
-        keys = new ArrayList<>((int)maxExpectedEntries);
-        // if we're downsampling we may not use index 0
+        maxExpectedEntries = Math.max(1, (maxExpectedEntries * samplingLevel) / BASE_SAMPLING_LEVEL);
+        offsets = new SafeMemoryWriter(4 * maxExpectedEntries).withByteOrder(ByteOrder.nativeOrder());
+        entries = new SafeMemoryWriter(40 * maxExpectedEntries).withByteOrder(ByteOrder.nativeOrder());
         setNextSamplePosition(-minIndexInterval);
     }
 
@@ -165,16 +165,16 @@ public class IndexSummaryBuilder
     {
         if (keysWritten == nextSamplePosition)
         {
-            keys.add(getMinimalKey(decoratedKey));
-            offheapSize += decoratedKey.getKey().remaining();
-            positions.add(indexStart);
-            offheapSize += TypeSizes.NATIVE.sizeof(indexStart);
+            assert entries.length() <= Integer.MAX_VALUE;
+            offsets.writeInt((int) entries.length());
+            entries.write(decoratedKey.getKey());
+            entries.writeLong(indexStart);
             setNextSamplePosition(keysWritten);
         }
         else if (dataEnd != 0 && keysWritten + 1 == nextSamplePosition)
         {
             // this is the last key in this summary interval, so stash it
-            ReadableBoundary boundary = new ReadableBoundary(decoratedKey, indexEnd, dataEnd);
+            ReadableBoundary boundary = new ReadableBoundary(decoratedKey, indexEnd, dataEnd, (int)(offsets.length() / 4), entries.length());
             lastReadableByData.put(dataEnd, boundary);
             lastReadableByIndex.put(indexEnd, boundary);
         }
@@ -201,52 +201,39 @@ public class IndexSummaryBuilder
 
     public IndexSummary build(IPartitioner partitioner)
     {
+        // this method should only be called when we've finished appending records, so we truncate the
+        // memory we're using to the exact amount required to represent it before building our summary
+        entries.setCapacity(entries.length());
+        offsets.setCapacity(offsets.length());
         return build(partitioner, null);
     }
 
-    // lastIntervalKey should come from getLastReadableBoundary().lastKey
-    public IndexSummary build(IPartitioner partitioner, DecoratedKey lastIntervalKey)
+    // build the summary up to the provided boundary; this is backed by shared memory between
+    // multiple invocations of this build method
+    public IndexSummary build(IPartitioner partitioner, ReadableBoundary boundary)
     {
-        assert keys.size() > 0;
-        assert keys.size() == positions.size();
-
-        int length;
-        if (lastIntervalKey == null)
-            length = keys.size();
-        else // since it's an inclusive upper bound, this should never match exactly
-            length = -1 -Collections.binarySearch(keys, lastIntervalKey);
-
-        assert length > 0;
-
-        long offheapSize = this.offheapSize;
-        if (length < keys.size())
-            for (int i = length ; i < keys.size() ; i++)
-                offheapSize -= keys.get(i).getKey().remaining() + TypeSizes.NATIVE.sizeof(positions.get(i));
-
-        // first we write out the position in the *summary* for each key in the summary,
-        // then we write out (key, actual index position) pairs
-        Memory memory = Memory.allocate(offheapSize + (length * 4));
-        int idxPosition = 0;
-        int keyPosition = length * 4;
-        for (int i = 0; i < length; i++)
+        assert entries.length() > 0;
+
+        int count = (int) (offsets.length() / 4);
+        long entriesLength = entries.length();
+        if (boundary != null)
         {
-            // write the position of the actual entry in the index summary (4 bytes)
-            memory.setInt(idxPosition, keyPosition);
-            idxPosition += TypeSizes.NATIVE.sizeof(keyPosition);
-
-            // write the key
-            ByteBuffer keyBytes = keys.get(i).getKey();
-            memory.setBytes(keyPosition, keyBytes);
-            keyPosition += keyBytes.remaining();
-
-            // write the position in the actual index file
-            long actualIndexPosition = positions.get(i);
-            memory.setLong(keyPosition, actualIndexPosition);
-            keyPosition += TypeSizes.NATIVE.sizeof(actualIndexPosition);
+            count = boundary.summaryCount;
+            entriesLength = boundary.entriesLength;
         }
-        assert keyPosition == offheapSize + (length * 4);
+
         int sizeAtFullSampling = (int) Math.ceil(keysWritten / (double) minIndexInterval);
-        return new IndexSummary(partitioner, memory, length, sizeAtFullSampling, minIndexInterval, samplingLevel);
+        assert count > 0;
+        return new IndexSummary(partitioner, offsets.currentBuffer().sharedCopy(),
+                                count, entries.currentBuffer().sharedCopy(), entriesLength,
+                                sizeAtFullSampling, minIndexInterval, samplingLevel);
+    }
+
+    // close the builder and release any associated memory
+    public void close()
+    {
+        entries.close();
+        offsets.close();
     }
 
     public static int entriesAtSamplingLevel(int samplingLevel, int maxSummarySize)
@@ -294,26 +281,25 @@ public class IndexSummaryBuilder
         int[] startPoints = Downsampling.getStartPoints(currentSamplingLevel, newSamplingLevel);
 
         // calculate new off-heap size
-        int removedKeyCount = 0;
-        long newOffHeapSize = existing.getOffHeapSize();
+        int newKeyCount = existing.size();
+        long newEntriesLength = existing.getEntriesLength();
         for (int start : startPoints)
         {
             for (int j = start; j < existing.size(); j += currentSamplingLevel)
             {
-                removedKeyCount++;
-                newOffHeapSize -= existing.getEntry(j).length;
+                newKeyCount--;
+                long length = existing.getEndInSummary(j) - existing.getPositionInSummary(j);
+                newEntriesLength -= length;
             }
         }
 
-        int newKeyCount = existing.size() - removedKeyCount;
-
-        // Subtract (removedKeyCount * 4) from the new size to account for fewer entries in the first section, which
-        // stores the position of the actual entries in the summary.
-        RefCountedMemory memory = new RefCountedMemory(newOffHeapSize - (removedKeyCount * 4));
+        Memory oldEntries = existing.getEntries();
+        Memory newOffsets = Memory.allocate(newKeyCount * 4);
+        Memory newEntries = Memory.allocate(newEntriesLength);
 
         // Copy old entries to our new Memory.
-        int idxPosition = 0;
-        int keyPosition = newKeyCount * 4;
+        int i = 0;
+        int newEntriesOffset = 0;
         outer:
         for (int oldSummaryIndex = 0; oldSummaryIndex < existing.size(); oldSummaryIndex++)
         {
@@ -326,15 +312,15 @@ public class IndexSummaryBuilder
             }
 
             // write the position of the actual entry in the index summary (4 bytes)
-            memory.setInt(idxPosition, keyPosition);
-            idxPosition += TypeSizes.NATIVE.sizeof(keyPosition);
-
-            // write the entry itself
-            byte[] entry = existing.getEntry(oldSummaryIndex);
-            memory.setBytes(keyPosition, entry, 0, entry.length);
-            keyPosition += entry.length;
+            newOffsets.setInt(i * 4, newEntriesOffset);
+            i++;
+            long start = existing.getPositionInSummary(oldSummaryIndex);
+            long length = existing.getEndInSummary(oldSummaryIndex) - start;
+            newEntries.put(newEntriesOffset, oldEntries, start, length);
+            newEntriesOffset += length;
         }
-        return new IndexSummary(partitioner, memory, newKeyCount, existing.getMaxNumberOfEntries(),
-                                minIndexInterval, newSamplingLevel);
+        assert newEntriesOffset == newEntriesLength;
+        return new IndexSummary(partitioner, newOffsets, newKeyCount, newEntries, newEntriesLength,
+                                existing.getMaxNumberOfEntries(), minIndexInterval, newSamplingLevel);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 973b0c9..41e4adb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -736,40 +736,40 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
             long indexSize = primaryIndex.length();
             long histogramCount = sstableMetadata.estimatedRowSize.count();
             long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
-                               ? histogramCount
-                               : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
+                                 ? histogramCount
+                                 : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
 
-            if (recreateBloomFilter)
-                bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true);
-
-            IndexSummaryBuilder summaryBuilder = null;
-            if (!summaryLoaded)
-                summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel);
-
-            long indexPosition;
-            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+            try(IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel))
             {
-                ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
-                RowIndexEntry indexEntry = metadata.comparator.rowIndexEntrySerializer().deserialize(primaryIndex, descriptor.version);
-                DecoratedKey decoratedKey = partitioner.decorateKey(key);
-                if (first == null)
-                    first = decoratedKey;
-                last = decoratedKey;
 
                 if (recreateBloomFilter)
-                    bf.add(decoratedKey.getKey());
+                    bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true);
 
-                // if summary was already read from disk we don't want to re-populate it using primary index
-                if (!summaryLoaded)
+                long indexPosition;
+                while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
                 {
-                    summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
-                    ibuilder.addPotentialBoundary(indexPosition);
-                    dbuilder.addPotentialBoundary(indexEntry.position);
+                    ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
+                    RowIndexEntry indexEntry = metadata.comparator.rowIndexEntrySerializer().deserialize(primaryIndex, descriptor.version);
+                    DecoratedKey decoratedKey = partitioner.decorateKey(key);
+                    if (first == null)
+                        first = decoratedKey;
+                    last = decoratedKey;
+
+                    if (recreateBloomFilter)
+                        bf.add(decoratedKey.getKey());
+
+                    // if summary was already read from disk we don't want to re-populate it using primary index
+                    if (!summaryLoaded)
+                    {
+                        summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
+                        ibuilder.addPotentialBoundary(indexPosition);
+                        dbuilder.addPotentialBoundary(indexEntry.position);
+                    }
                 }
-            }
 
-            if (!summaryLoaded)
-                indexSummary = summaryBuilder.build(partitioner);
+                if (!summaryLoaded)
+                    indexSummary = summaryBuilder.build(partitioner);
+            }
         }
         finally
         {
@@ -1004,16 +1004,17 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
         try
         {
             long indexSize = primaryIndex.length();
-            IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel);
-
-            long indexPosition;
-            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+            try (IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel))
             {
-                summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
-                RowIndexEntry.Serializer.skip(primaryIndex);
-            }
+                long indexPosition;
+                while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+                {
+                    summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
+                    RowIndexEntry.Serializer.skip(primaryIndex);
+                }
 
-            return summaryBuilder.build(partitioner);
+                return summaryBuilder.build(partitioner);
+            }
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index b67685d..b35b652 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -403,7 +403,7 @@ public class SSTableWriter extends SSTable
         SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
                                                            components, metadata,
                                                            partitioner, ifile,
-                                                           dfile, iwriter.summary.build(partitioner, boundary.lastKey),
+                                                           dfile, iwriter.summary.build(partitioner, boundary),
                                                            iwriter.bf.sharedCopy(), maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
 
         // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
@@ -470,6 +470,7 @@ public class SSTableWriter extends SSTable
         if (finishType.isFinal)
         {
             iwriter.bf.close();
+            iwriter.summary.close();
             // try to save the summaries to disk
             sstable.saveSummary(iwriter.builder, dbuilder);
             iwriter = null;
@@ -627,6 +628,7 @@ public class SSTableWriter extends SSTable
 
         public void abort()
         {
+            summary.close();
             indexFile.abort();
             bf.close();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java b/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
index 3e38293..8f4bed8 100644
--- a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
+++ b/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
@@ -321,9 +321,9 @@ public abstract class AbstractDataOutput extends OutputStream implements DataOut
         }
     }
 
-    public void write(Memory memory) throws IOException
+    public void write(Memory memory, long offset, long length) throws IOException
     {
-        for (ByteBuffer buffer : memory.asByteBuffers())
+        for (ByteBuffer buffer : memory.asByteBuffers(offset, length))
             write(buffer);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
index 36c25ee..c2901e1 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
@@ -27,6 +27,5 @@ public interface DataOutputPlus extends DataOutput
     // write the buffer without modifying its position
     void write(ByteBuffer buffer) throws IOException;
 
-    void write(Memory memory) throws IOException;
-
+    void write(Memory memory, long offset, long length) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index ea78840..dcb9de6 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -25,6 +25,7 @@ import com.sun.jna.Native;
 import net.nicoulaj.compilecommand.annotations.Inline;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.FastByteOperations;
+import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.memory.MemoryUtil;
 import sun.misc.Unsafe;
 import sun.nio.ch.DirectBuffer;
@@ -77,6 +78,9 @@ public class Memory implements AutoCloseable
         if (bytes < 0)
             throw new IllegalArgumentException();
 
+        if (Ref.DEBUG_ENABLED)
+            return new SafeMemory(bytes);
+
         return new Memory(bytes);
     }
 
@@ -163,6 +167,33 @@ public class Memory implements AutoCloseable
         }
     }
 
+    public void setShort(long offset, short l)
+    {
+        checkBounds(offset, offset + 4);
+        if (unaligned)
+        {
+            unsafe.putShort(peer + offset, l);
+        }
+        else
+        {
+            putShortByByte(peer + offset, l);
+        }
+    }
+
+    private void putShortByByte(long address, short value)
+    {
+        if (bigEndian)
+        {
+            unsafe.putByte(address, (byte) (value >> 8));
+            unsafe.putByte(address + 1, (byte) (value));
+        }
+        else
+        {
+            unsafe.putByte(address + 1, (byte) (value >> 8));
+            unsafe.putByte(address, (byte) (value));
+        }
+    }
+
     public void setBytes(long memoryOffset, ByteBuffer buffer)
     {
         if (buffer == null)
@@ -340,20 +371,20 @@ public class Memory implements AutoCloseable
         return false;
     }
 
-    public ByteBuffer[] asByteBuffers()
+    public ByteBuffer[] asByteBuffers(long offset, long length)
     {
         if (size() == 0)
             return new ByteBuffer[0];
 
-        ByteBuffer[] result = new ByteBuffer[(int) (size() / Integer.MAX_VALUE) + 1];
-        long offset = 0;
+        ByteBuffer[] result = new ByteBuffer[(int) (length / Integer.MAX_VALUE) + 1];
         int size = (int) (size() / result.length);
         for (int i = 0 ; i < result.length - 1 ; i++)
         {
             result[i] = MemoryUtil.getByteBuffer(peer + offset, size);
             offset += size;
+            length -= size;
         }
-        result[result.length - 1] = MemoryUtil.getByteBuffer(peer + offset, (int) (size() - offset));
+        result[result.length - 1] = MemoryUtil.getByteBuffer(peer + offset, (int) length);
         return result;
     }
 
@@ -366,5 +397,4 @@ public class Memory implements AutoCloseable
     {
         return String.format("Memory@[%x..%x)", peer, peer + size);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
new file mode 100644
index 0000000..1998cc6
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
@@ -0,0 +1,136 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class SafeMemoryWriter extends AbstractDataOutput implements DataOutputPlus
+{
+    private ByteOrder order = ByteOrder.BIG_ENDIAN;
+    private SafeMemory buffer;
+    private long length;
+
+    public SafeMemoryWriter(long initialCapacity)
+    {
+        buffer = new SafeMemory(initialCapacity);
+    }
+
+    public void write(byte[] buffer, int offset, int count)
+    {
+        long newLength = ensureCapacity(count);
+        this.buffer.setBytes(this.length, buffer, offset, count);
+        this.length = newLength;
+    }
+
+    public void write(int oneByte)
+    {
+        long newLength = ensureCapacity(1);
+        buffer.setByte(length++, (byte) oneByte);
+        length = newLength;
+    }
+
+    public void writeShort(int val) throws IOException
+    {
+        if (order != ByteOrder.nativeOrder())
+            val = Short.reverseBytes((short) val);
+        long newLength = ensureCapacity(2);
+        buffer.setShort(length, (short) val);
+        length = newLength;
+    }
+
+    public void writeInt(int val)
+    {
+        if (order != ByteOrder.nativeOrder())
+            val = Integer.reverseBytes(val);
+        long newLength = ensureCapacity(4);
+        buffer.setInt(length, val);
+        length = newLength;
+    }
+
+    public void writeLong(long val)
+    {
+        if (order != ByteOrder.nativeOrder())
+            val = Long.reverseBytes(val);
+        long newLength = ensureCapacity(8);
+        buffer.setLong(length, val);
+        length = newLength;
+    }
+
+    public void write(ByteBuffer buffer)
+    {
+        long newLength = ensureCapacity(buffer.remaining());
+        this.buffer.setBytes(length, buffer);
+        length = newLength;
+    }
+
+    public void write(Memory memory)
+    {
+        long newLength = ensureCapacity(memory.size());
+        buffer.put(length, memory, 0, memory.size());
+        length = newLength;
+    }
+
+    private long ensureCapacity(long size)
+    {
+        long newLength = this.length + size;
+        if (newLength > buffer.size())
+            setCapacity(Math.max(newLength, buffer.size() + (buffer.size() / 2)));
+        return newLength;
+    }
+
+    public SafeMemory currentBuffer()
+    {
+        return buffer;
+    }
+
+    public void setCapacity(long newCapacity)
+    {
+        if (newCapacity != capacity())
+        {
+            SafeMemory oldBuffer = buffer;
+            buffer = this.buffer.copy(newCapacity);
+            oldBuffer.free();
+        }
+    }
+
+    public void close()
+    {
+        buffer.close();
+    }
+
+    public long length()
+    {
+        return length;
+    }
+
+    public long capacity()
+    {
+        return buffer.size();
+    }
+
+    // TODO: consider hoisting this into DataOutputPlus, since most implementations can copy with this gracefully
+    // this would simplify IndexSummary.IndexSummarySerializer.serialize()
+    public SafeMemoryWriter withByteOrder(ByteOrder order)
+    {
+        this.order = order;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index 8213c46..4e6cef7 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -50,7 +50,7 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
 public final class Ref<T> implements RefCounted<T>, AutoCloseable
 {
     static final Logger logger = LoggerFactory.getLogger(Ref.class);
-    static final boolean DEBUG_ENABLED = System.getProperty("cassandra.debugrefcount", "false").equalsIgnoreCase("true");
+    public static final boolean DEBUG_ENABLED = System.getProperty("cassandra.debugrefcount", "false").equalsIgnoreCase("true");
 
     final State state;
     final T referent;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java b/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
index c656f28..96e226c 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
@@ -18,26 +18,34 @@
 */
 package org.apache.cassandra.utils.concurrent;
 
+import java.util.Arrays;
+
 /**
  * An implementation of SharedCloseable that wraps a normal AutoCloseable,
  * ensuring its close method is only called when all instances of SharedCloseable have been
  */
 public abstract class WrappedSharedCloseable extends SharedCloseableImpl
 {
-    final AutoCloseable wrapped;
+    final AutoCloseable[] wrapped;
 
     public WrappedSharedCloseable(final AutoCloseable closeable)
     {
+        this(new AutoCloseable[] { closeable});
+    }
+
+    public WrappedSharedCloseable(final AutoCloseable[] closeable)
+    {
         super(new RefCounted.Tidy()
         {
             public void tidy() throws Exception
             {
-                closeable.close();
+                for (AutoCloseable c : closeable)
+                    c.close();
             }
 
             public String name()
             {
-                return closeable.toString();
+                return Arrays.toString(closeable);
             }
         });
         wrapped = closeable;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index 9aca66d..9c709a3 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -91,38 +91,42 @@ public class IndexSummaryTest
     public void testAddEmptyKey() throws Exception
     {
         IPartitioner p = new RandomPartitioner();
-        IndexSummaryBuilder builder = new IndexSummaryBuilder(1, 1, BASE_SAMPLING_LEVEL);
-        builder.maybeAddEntry(p.decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER), 0);
-        IndexSummary summary = builder.build(p);
-        assertEquals(1, summary.size());
-        assertEquals(0, summary.getPosition(0));
-        assertArrayEquals(new byte[0], summary.getKey(0));
-
-        DataOutputBuffer dos = new DataOutputBuffer();
-        IndexSummary.serializer.serialize(summary, dos, false);
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dos.toByteArray()));
-        IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p, false, 1, 1);
-
-        assertEquals(1, loaded.size());
-        assertEquals(summary.getPosition(0), loaded.getPosition(0));
-        assertArrayEquals(summary.getKey(0), summary.getKey(0));
+        try (IndexSummaryBuilder builder = new IndexSummaryBuilder(1, 1, BASE_SAMPLING_LEVEL))
+        {
+            builder.maybeAddEntry(p.decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER), 0);
+            IndexSummary summary = builder.build(p);
+            assertEquals(1, summary.size());
+            assertEquals(0, summary.getPosition(0));
+            assertArrayEquals(new byte[0], summary.getKey(0));
+
+            DataOutputBuffer dos = new DataOutputBuffer();
+            IndexSummary.serializer.serialize(summary, dos, false);
+            DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dos.toByteArray()));
+            IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p, false, 1, 1);
+
+            assertEquals(1, loaded.size());
+            assertEquals(summary.getPosition(0), loaded.getPosition(0));
+            assertArrayEquals(summary.getKey(0), summary.getKey(0));
+        }
     }
 
     private Pair<List<DecoratedKey>, IndexSummary> generateRandomIndex(int size, int interval)
     {
         List<DecoratedKey> list = Lists.newArrayList();
-        IndexSummaryBuilder builder = new IndexSummaryBuilder(list.size(), interval, BASE_SAMPLING_LEVEL);
-        for (int i = 0; i < size; i++)
+        try (IndexSummaryBuilder builder = new IndexSummaryBuilder(list.size(), interval, BASE_SAMPLING_LEVEL))
         {
-            UUID uuid = UUID.randomUUID();
-            DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(uuid));
-            list.add(key);
+            for (int i = 0; i < size; i++)
+            {
+                UUID uuid = UUID.randomUUID();
+                DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(uuid));
+                list.add(key);
+            }
+            Collections.sort(list);
+            for (int i = 0; i < size; i++)
+                builder.maybeAddEntry(list.get(i), i);
+            IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner());
+            return Pair.create(list, summary);
         }
-        Collections.sort(list);
-        for (int i = 0; i < size; i++)
-            builder.maybeAddEntry(list.get(i), i);
-        IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner());
-        return Pair.create(list, summary);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 76f3304..7110d1d 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -92,6 +92,17 @@ public class DataOutputTest
     }
 
     @Test
+    public void testSafeMemoryWriter() throws IOException
+    {
+        SafeMemoryWriter write = new SafeMemoryWriter(10);
+        DataInput canon = testWrite(write);
+        byte[] bytes = new byte[345];
+        write.currentBuffer().getBytes(0, bytes, 0, 345);
+        DataInput test = new DataInputStream(new ByteArrayInputStream(bytes));
+        testRead(test, canon);
+    }
+
+    @Test
     public void testFileOutputStream() throws IOException
     {
         File file = FileUtils.createTempFile("dataoutput", "test");


[3/4] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by be...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e473ce06/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 0186c68,0000000..a1b923d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@@ -1,583 -1,0 +1,585 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format.big;
 +
 +import java.io.DataInput;
 +import java.io.File;
 +import java.io.FileOutputStream;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Collections;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.io.sstable.format.Version;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 +import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 +import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
 +import org.apache.cassandra.io.util.FileMark;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.io.util.SegmentedFile;
 +import org.apache.cassandra.io.util.SequentialWriter;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.FilterFactory;
 +import org.apache.cassandra.utils.IFilter;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.StreamingHistogram;
 +
 +public class BigTableWriter extends SSTableWriter
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class);
 +
 +    // not very random, but the only value that can't be mistaken for a legal column-name length
 +    public static final int END_OF_ROW = 0x0000;
 +
 +    private IndexWriter iwriter;
 +    private SegmentedFile.Builder dbuilder;
 +    private final SequentialWriter dataFile;
 +    private DecoratedKey lastWrittenKey;
 +    private FileMark dataMark;
 +
 +    BigTableWriter(Descriptor descriptor, Long keyCount, Long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
 +    {
 +        super(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector);
 +
 +        if (compression)
 +        {
 +            dataFile = SequentialWriter.open(getFilename(),
 +                                             descriptor.filenameFor(Component.COMPRESSION_INFO),
 +                                             metadata.compressionParameters(),
 +                                             metadataCollector);
 +            dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile);
 +        }
 +        else
 +        {
 +            dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC)));
 +            dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +        }
 +        iwriter = new IndexWriter(keyCount, dataFile);
 +    }
 +
 +    public void mark()
 +    {
 +        dataMark = dataFile.mark();
 +        iwriter.mark();
 +    }
 +
 +    public void resetAndTruncate()
 +    {
 +        dataFile.resetAndTruncate(dataMark);
 +        iwriter.resetAndTruncate();
 +    }
 +
 +    /**
 +     * Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written
 +     */
 +    private long beforeAppend(DecoratedKey decoratedKey)
 +    {
 +        assert decoratedKey != null : "Keys must not be null"; // empty keys ARE allowed b/c of indexed column values
 +        if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0)
 +            throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename());
 +        return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
 +    }
 +
 +    private void afterAppend(DecoratedKey decoratedKey, long dataPosition, RowIndexEntry index)
 +    {
 +        metadataCollector.addKey(decoratedKey.getKey());
 +        lastWrittenKey = decoratedKey;
 +        last = lastWrittenKey;
 +        if (first == null)
 +            first = lastWrittenKey;
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("wrote {} at {}", decoratedKey, dataPosition);
 +        iwriter.append(decoratedKey, index, dataPosition);
 +        dbuilder.addPotentialBoundary(dataPosition);
 +    }
 +
 +    /**
 +     * @param row
 +     * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row
 +     */
 +    public RowIndexEntry append(AbstractCompactedRow row)
 +    {
 +        long startPosition = beforeAppend(row.key);
 +        RowIndexEntry entry;
 +        try
 +        {
 +            entry = row.write(startPosition, dataFile);
 +            if (entry == null)
 +                return null;
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +        long endPosition = dataFile.getFilePointer();
 +        metadataCollector.update(endPosition - startPosition, row.columnStats());
 +        afterAppend(row.key, endPosition, entry);
 +        return entry;
 +    }
 +
 +    public void append(DecoratedKey decoratedKey, ColumnFamily cf)
 +    {
 +        if (decoratedKey.getKey().remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
 +        {
 +            logger.error("Key size {} exceeds maximum of {}, skipping row",
 +                         decoratedKey.getKey().remaining(),
 +                         FBUtilities.MAX_UNSIGNED_SHORT);
 +            return;
 +        }
 +
 +        long startPosition = beforeAppend(decoratedKey);
 +        try
 +        {
 +            RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream);
 +            afterAppend(decoratedKey, startPosition, entry);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +        metadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats());
 +    }
 +
 +    private static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutputPlus out) throws IOException
 +    {
 +        assert cf.hasColumns() || cf.isMarkedForDelete();
 +
 +        ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.getKey(), out);
 +        ColumnIndex index = builder.build(cf);
 +
 +        out.writeShort(END_OF_ROW);
 +        return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index);
 +    }
 +
 +    /**
 +     * @throws IOException if a read from the DataInput fails
 +     * @throws FSWriteError if a write to the dataFile fails
 +     */
 +    public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in, Version version) throws IOException
 +    {
 +        long currentPosition = beforeAppend(key);
 +
 +        ColumnStats.MaxLongTracker maxTimestampTracker = new ColumnStats.MaxLongTracker(Long.MAX_VALUE);
 +        ColumnStats.MinLongTracker minTimestampTracker = new ColumnStats.MinLongTracker(Long.MIN_VALUE);
 +        ColumnStats.MaxIntTracker maxDeletionTimeTracker = new ColumnStats.MaxIntTracker(Integer.MAX_VALUE);
 +        List<ByteBuffer> minColumnNames = Collections.emptyList();
 +        List<ByteBuffer> maxColumnNames = Collections.emptyList();
 +        StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
 +        boolean hasLegacyCounterShards = false;
 +
 +        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
 +        cf.delete(DeletionTime.serializer.deserialize(in));
 +
 +        ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.getKey(), dataFile.stream);
 +
 +        if (cf.deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE)
 +        {
 +            tombstones.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
 +            maxDeletionTimeTracker.update(cf.deletionInfo().getTopLevelDeletion().localDeletionTime);
 +            minTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 +            maxTimestampTracker.update(cf.deletionInfo().getTopLevelDeletion().markedForDeleteAt);
 +        }
 +
 +        Iterator<RangeTombstone> rangeTombstoneIterator = cf.deletionInfo().rangeIterator();
 +        while (rangeTombstoneIterator.hasNext())
 +        {
 +            RangeTombstone rangeTombstone = rangeTombstoneIterator.next();
 +            tombstones.update(rangeTombstone.getLocalDeletionTime());
 +            minTimestampTracker.update(rangeTombstone.timestamp());
 +            maxTimestampTracker.update(rangeTombstone.timestamp());
 +            maxDeletionTimeTracker.update(rangeTombstone.getLocalDeletionTime());
 +            minColumnNames = ColumnNameHelper.minComponents(minColumnNames, rangeTombstone.min, metadata.comparator);
 +            maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, rangeTombstone.max, metadata.comparator);
 +        }
 +
 +        Iterator<OnDiskAtom> iter = AbstractCell.onDiskIterator(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, version, metadata.comparator);
 +        try
 +        {
 +            while (iter.hasNext())
 +            {
 +                OnDiskAtom atom = iter.next();
 +                if (atom == null)
 +                    break;
 +
 +                if (atom instanceof CounterCell)
 +                {
 +                    atom = ((CounterCell) atom).markLocalToBeCleared();
 +                    hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards();
 +                }
 +
 +                int deletionTime = atom.getLocalDeletionTime();
 +                if (deletionTime < Integer.MAX_VALUE)
 +                    tombstones.update(deletionTime);
 +                minTimestampTracker.update(atom.timestamp());
 +                maxTimestampTracker.update(atom.timestamp());
 +                minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(), metadata.comparator);
 +                maxColumnNames = ColumnNameHelper.maxComponents(maxColumnNames, atom.name(), metadata.comparator);
 +                maxDeletionTimeTracker.update(atom.getLocalDeletionTime());
 +
 +                columnIndexer.add(atom); // This write the atom on disk too
 +            }
 +
 +            columnIndexer.maybeWriteEmptyRowHeader();
 +            dataFile.stream.writeShort(END_OF_ROW);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, dataFile.getPath());
 +        }
 +
 +        metadataCollector.updateMinTimestamp(minTimestampTracker.get())
 +                         .updateMaxTimestamp(maxTimestampTracker.get())
 +                         .updateMaxLocalDeletionTime(maxDeletionTimeTracker.get())
 +                         .addRowSize(dataFile.getFilePointer() - currentPosition)
 +                         .addColumnCount(columnIndexer.writtenAtomCount())
 +                         .mergeTombstoneHistogram(tombstones)
 +                         .updateMinColumnNames(minColumnNames)
 +                         .updateMaxColumnNames(maxColumnNames)
 +                         .updateHasLegacyCounterShards(hasLegacyCounterShards);
 +
 +        afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(), columnIndexer.build()));
 +        return currentPosition;
 +    }
 +
 +    /**
 +     * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable
 +     */
 +    public void abort()
 +    {
 +        assert descriptor.type.isTemporary;
 +        if (iwriter == null && dataFile == null)
 +            return;
 +
 +        if (iwriter != null)
 +            iwriter.abort();
 +
 +        if (dataFile!= null)
 +            dataFile.abort();
 +
 +        Set<Component> components = SSTable.componentsFor(descriptor);
 +        try
 +        {
 +            if (!components.isEmpty())
 +                SSTable.delete(descriptor, components);
 +        }
 +        catch (FSWriteError e)
 +        {
 +            logger.error(String.format("Failed deleting temp components for %s", descriptor), e);
 +            throw e;
 +        }
 +    }
 +
 +    // we use this method to ensure any managed data we may have retained references to during the write are no
 +    // longer referenced, so that we do not need to enclose the expensive call to closeAndOpenReader() in a transaction
 +    public void isolateReferences()
 +    {
 +        // currently we only maintain references to first/last/lastWrittenKey from the data provided; all other
 +        // data retention is done through copying
 +        first = getMinimalKey(first);
 +        last = lastWrittenKey = getMinimalKey(last);
 +    }
 +
 +    private Descriptor makeTmpLinks()
 +    {
 +        // create temp links if they don't already exist
 +        Descriptor link = descriptor.asType(Descriptor.Type.TEMPLINK);
 +        if (!new File(link.filenameFor(Component.PRIMARY_INDEX)).exists())
 +        {
 +            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), new File(link.filenameFor(Component.PRIMARY_INDEX)));
 +            FileUtils.createHardLink(new File(descriptor.filenameFor(Component.DATA)), new File(link.filenameFor(Component.DATA)));
 +        }
 +        return link;
 +    }
 +
 +    public SSTableReader openEarly(long maxDataAge)
 +    {
 +        StatsMetadata sstableMetadata = (StatsMetadata) metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
 +                                                  metadata.getBloomFilterFpChance(),
 +                                                  repairedAt).get(MetadataType.STATS);
 +
 +        // find the max (exclusive) readable key
 +        IndexSummaryBuilder.ReadableBoundary boundary = iwriter.getMaxReadable();
 +        if (boundary == null)
 +            return null;
 +
 +        assert boundary.indexLength > 0 && boundary.dataLength > 0;
 +        Descriptor link = makeTmpLinks();
 +        // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
 +        SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), boundary.indexLength);
 +        SegmentedFile dfile = dbuilder.complete(link.filenameFor(Component.DATA), boundary.dataLength);
 +        SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
 +                                                           components, metadata,
 +                                                           partitioner, ifile,
-                                                            dfile, iwriter.summary.build(partitioner, boundary.lastKey),
++                                                           dfile, iwriter.summary.build(partitioner, boundary),
 +                                                           iwriter.bf.sharedCopy(), maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
 +
 +        // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
 +        sstable.first = getMinimalKey(first);
 +        sstable.last = getMinimalKey(boundary.lastKey);
 +        return sstable;
 +    }
 +
 +    public SSTableReader closeAndOpenReader()
 +    {
 +        return closeAndOpenReader(System.currentTimeMillis());
 +    }
 +
 +    public SSTableReader closeAndOpenReader(long maxDataAge)
 +    {
 +        return finish(FinishType.NORMAL, maxDataAge, this.repairedAt);
 +    }
 +
 +    public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt)
 +    {
 +        assert finishType != FinishType.CLOSE;
 +        Pair<Descriptor, StatsMetadata> p;
 +
 +        p = close(finishType, repairedAt < 0 ? this.repairedAt : repairedAt);
 +        Descriptor desc = p.left;
 +        StatsMetadata metadata = p.right;
 +
 +        if (finishType == FinishType.EARLY)
 +            desc = makeTmpLinks();
 +
 +        // finalize in-memory state for the reader
 +        SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType.isFinal);
 +        SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType.isFinal);
 +        SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
 +                                                           components,
 +                                                           this.metadata,
 +                                                           partitioner,
 +                                                           ifile,
 +                                                           dfile,
 +                                                           iwriter.summary.build(partitioner),
 +                                                           iwriter.bf.sharedCopy(),
 +                                                           maxDataAge,
 +                                                           metadata,
 +                                                           finishType.openReason);
 +        sstable.first = getMinimalKey(first);
 +        sstable.last = getMinimalKey(last);
 +
 +        if (finishType.isFinal)
 +        {
 +            iwriter.bf.close();
++            iwriter.summary.close();
 +            // try to save the summaries to disk
 +            sstable.saveSummary(iwriter.builder, dbuilder);
 +            iwriter = null;
 +            dbuilder = null;
 +        }
 +        return sstable;
 +    }
 +
 +    // Close the writer and return the descriptor to the new sstable and it's metadata
 +    public Pair<Descriptor, StatsMetadata> close()
 +    {
 +        return close(FinishType.CLOSE, this.repairedAt);
 +    }
 +
 +    private Pair<Descriptor, StatsMetadata> close(FinishType type, long repairedAt)
 +    {
 +        switch (type)
 +        {
 +            case EARLY: case CLOSE: case NORMAL:
 +            iwriter.close();
 +            dataFile.close();
 +            if (type == FinishType.CLOSE)
 +                iwriter.bf.close();
 +        }
 +
 +        // write sstable statistics
 +        Map<MetadataType, MetadataComponent> metadataComponents;
 +        metadataComponents = metadataCollector
 +                             .finalizeMetadata(partitioner.getClass().getCanonicalName(),
 +                                               metadata.getBloomFilterFpChance(),repairedAt);
 +
 +        // remove the 'tmp' marker from all components
 +        Descriptor descriptor = this.descriptor;
 +        if (type.isFinal)
 +        {
 +            dataFile.writeFullChecksum(descriptor);
 +            writeMetadata(descriptor, metadataComponents);
 +            // save the table of components
 +            SSTable.appendTOC(descriptor, components);
 +            descriptor = rename(descriptor, components);
 +        }
 +
 +        return Pair.create(descriptor, (StatsMetadata) metadataComponents.get(MetadataType.STATS));
 +    }
 +
 +    private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
 +    {
 +        SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));
 +        try
 +        {
 +            desc.getMetadataSerializer().serialize(components, out.stream);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, out.getPath());
 +        }
 +        finally
 +        {
 +            out.close();
 +        }
 +    }
 +
 +    public long getFilePointer()
 +    {
 +        return dataFile.getFilePointer();
 +    }
 +
 +    public long getOnDiskFilePointer()
 +    {
 +        return dataFile.getOnDiskFilePointer();
 +    }
 +
 +    /**
 +     * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
 +     */
 +    class IndexWriter
 +    {
 +        private final SequentialWriter indexFile;
 +        public final SegmentedFile.Builder builder;
 +        public final IndexSummaryBuilder summary;
 +        public final IFilter bf;
 +        private FileMark mark;
 +
 +        IndexWriter(long keyCount, final SequentialWriter dataFile)
 +        {
 +            indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +            builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +            summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
 +            bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
 +            // register listeners to be alerted when the data files are flushed
 +            indexFile.setPostFlushListener(new Runnable()
 +            {
 +                public void run()
 +                {
 +                    summary.markIndexSynced(indexFile.getLastFlushOffset());
 +                }
 +            });
 +            dataFile.setPostFlushListener(new Runnable()
 +            {
 +                public void run()
 +                {
 +                    summary.markDataSynced(dataFile.getLastFlushOffset());
 +                }
 +            });
 +        }
 +
 +        // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file
 +        IndexSummaryBuilder.ReadableBoundary getMaxReadable()
 +        {
 +            return summary.getLastReadableBoundary();
 +        }
 +
 +        public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd)
 +        {
 +            bf.add(key);
 +            long indexStart = indexFile.getFilePointer();
 +            try
 +            {
 +                ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream);
 +                rowIndexEntrySerializer.serialize(indexEntry, indexFile.stream);
 +            }
 +            catch (IOException e)
 +            {
 +                throw new FSWriteError(e, indexFile.getPath());
 +            }
 +            long indexEnd = indexFile.getFilePointer();
 +
 +            if (logger.isTraceEnabled())
 +                logger.trace("wrote index entry: {} at {}", indexEntry, indexStart);
 +
 +            summary.maybeAddEntry(key, indexStart, indexEnd, dataEnd);
 +            builder.addPotentialBoundary(indexStart);
 +        }
 +
 +        public void abort()
 +        {
++            summary.close();
 +            indexFile.abort();
 +            bf.close();
 +        }
 +
 +        /**
 +         * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
 +         */
 +        public void close()
 +        {
 +            if (components.contains(Component.FILTER))
 +            {
 +                String path = descriptor.filenameFor(Component.FILTER);
 +                try
 +                {
 +                    // bloom filter
 +                    FileOutputStream fos = new FileOutputStream(path);
 +                    DataOutputStreamAndChannel stream = new DataOutputStreamAndChannel(fos);
 +                    FilterFactory.serialize(bf, stream);
 +                    stream.flush();
 +                    fos.getFD().sync();
 +                    stream.close();
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new FSWriteError(e, path);
 +                }
 +            }
 +
 +            // index
 +            long position = indexFile.getFilePointer();
 +            indexFile.close(); // calls force
 +            FileUtils.truncate(indexFile.getPath(), position);
 +        }
 +
 +        public void mark()
 +        {
 +            mark = indexFile.mark();
 +        }
 +
 +        public void resetAndTruncate()
 +        {
 +            // we can't un-set the bloom filter addition, but extra keys in there are harmless.
 +            // we can't reset dbuilder either, but that is the last thing called in afterappend so
 +            // we assume that if that worked then we won't be trying to reset.
 +            indexFile.resetAndTruncate(mark);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e473ce06/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e473ce06/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------


[2/4] cassandra git commit: IndexSummaryBuilder utilises offheap memory, and shares data between each IndexSummary opened from it

Posted by be...@apache.org.
IndexSummaryBuilder utilises offheap memory, and shares data between
each IndexSummary opened from it

patch by benedict; reviewed by ariel for CASSANDRA-8757


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f3c0e11e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f3c0e11e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f3c0e11e

Branch: refs/heads/trunk
Commit: f3c0e11e2ddb0b0666e7723a3fca005707b778ea
Parents: 3c3fefa
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Mar 4 15:07:32 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Mar 4 15:07:32 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../cassandra/io/sstable/IndexSummary.java      | 113 ++++++++++-----
 .../io/sstable/IndexSummaryBuilder.java         | 144 +++++++++----------
 .../cassandra/io/sstable/SSTableReader.java     |  69 ++++-----
 .../cassandra/io/sstable/SSTableWriter.java     |   4 +-
 .../cassandra/io/util/AbstractDataOutput.java   |   4 +-
 .../cassandra/io/util/DataOutputPlus.java       |   3 +-
 .../org/apache/cassandra/io/util/Memory.java    |  40 +++++-
 .../cassandra/io/util/SafeMemoryWriter.java     | 136 ++++++++++++++++++
 .../apache/cassandra/utils/concurrent/Ref.java  |   2 +-
 .../concurrent/WrappedSharedCloseable.java      |  14 +-
 .../cassandra/io/sstable/IndexSummaryTest.java  |  54 +++----
 .../cassandra/io/util/DataOutputTest.java       |  11 ++
 13 files changed, 411 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6133536..3b373ae 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,8 @@
  * cqlsh: Fix keys() and full() collection indexes in DESCRIBE output
    (CASSANDRA-8154)
  * Show progress of streaming in nodetool netstats (CASSANDRA-8886)
+ * IndexSummaryBuilder utilises offheap memory, and shares data between
+   each IndexSummary opened from it (CASSANDRA-8757)
 Merged from 2.0:
  * Add offline tool to relevel sstables (CASSANDRA-8301)
  * Preserve stream ID for more protocol errors (CASSANDRA-8848)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index 0cde124..bad50b4 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.io.sstable;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
-import org.apache.cassandra.cache.RefCountedMemory;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.IPartitioner;
@@ -54,9 +54,16 @@ public class IndexSummary extends WrappedSharedCloseable
     private final int minIndexInterval;
 
     private final IPartitioner partitioner;
-    private final int summarySize;
     private final int sizeAtFullSampling;
-    private final Memory bytes;
+    // we permit the memory to span a range larger than we use,
+    // so we have an accompanying count and length for each part
+    // we split our data into two ranges: offsets (indexing into entries),
+    // and entries containing the summary data
+    private final Memory offsets;
+    private final int offsetCount;
+    // entries is a list of (partition key, index file offset) pairs
+    private final Memory entries;
+    private final long entriesLength;
 
     /**
      * A value between 1 and BASE_SAMPLING_LEVEL that represents how many of the original
@@ -66,15 +73,18 @@ public class IndexSummary extends WrappedSharedCloseable
      */
     private final int samplingLevel;
 
-    public IndexSummary(IPartitioner partitioner, Memory bytes, int summarySize, int sizeAtFullSampling,
-                        int minIndexInterval, int samplingLevel)
+    public IndexSummary(IPartitioner partitioner, Memory offsets, int offsetCount, Memory entries, long entriesLength,
+                        int sizeAtFullSampling, int minIndexInterval, int samplingLevel)
     {
-        super(bytes);
+        super(new Memory[] { offsets, entries });
+        assert offsets.getInt(0) == 0;
         this.partitioner = partitioner;
         this.minIndexInterval = minIndexInterval;
-        this.summarySize = summarySize;
+        this.offsetCount = offsetCount;
+        this.entriesLength = entriesLength;
         this.sizeAtFullSampling = sizeAtFullSampling;
-        this.bytes = bytes;
+        this.offsets = offsets;
+        this.entries = entries;
         this.samplingLevel = samplingLevel;
     }
 
@@ -83,9 +93,11 @@ public class IndexSummary extends WrappedSharedCloseable
         super(copy);
         this.partitioner = copy.partitioner;
         this.minIndexInterval = copy.minIndexInterval;
-        this.summarySize = copy.summarySize;
+        this.offsetCount = copy.offsetCount;
+        this.entriesLength = copy.entriesLength;
         this.sizeAtFullSampling = copy.sizeAtFullSampling;
-        this.bytes = copy.bytes;
+        this.offsets = copy.offsets;
+        this.entries = copy.entries;
         this.samplingLevel = copy.samplingLevel;
     }
 
@@ -93,7 +105,7 @@ public class IndexSummary extends WrappedSharedCloseable
     // Harmony's Collections implementation
     public int binarySearch(RowPosition key)
     {
-        int low = 0, mid = summarySize, high = mid - 1, result = -1;
+        int low = 0, mid = offsetCount, high = mid - 1, result = -1;
         while (low <= high)
         {
             mid = (low + high) >> 1;
@@ -123,7 +135,7 @@ public class IndexSummary extends WrappedSharedCloseable
     public int getPositionInSummary(int index)
     {
         // The first section of bytes holds a four-byte position for each entry in the summary, so just multiply by 4.
-        return bytes.getInt(index << 2);
+        return offsets.getInt(index << 2);
     }
 
     public byte[] getKey(int index)
@@ -131,27 +143,23 @@ public class IndexSummary extends WrappedSharedCloseable
         long start = getPositionInSummary(index);
         int keySize = (int) (calculateEnd(index) - start - 8L);
         byte[] key = new byte[keySize];
-        bytes.getBytes(start, key, 0, keySize);
+        entries.getBytes(start, key, 0, keySize);
         return key;
     }
 
     public long getPosition(int index)
     {
-        return bytes.getLong(calculateEnd(index) - 8);
+        return entries.getLong(calculateEnd(index) - 8);
     }
 
-    public byte[] getEntry(int index)
+    public long getEndInSummary(int index)
     {
-        long start = getPositionInSummary(index);
-        long end = calculateEnd(index);
-        byte[] entry = new byte[(int)(end - start)];
-        bytes.getBytes(start, entry, 0, (int) (end - start));
-        return entry;
+        return calculateEnd(index);
     }
 
     private long calculateEnd(int index)
     {
-        return index == (summarySize - 1) ? bytes.size() : getPositionInSummary(index + 1);
+        return index == (offsetCount - 1) ? entriesLength : getPositionInSummary(index + 1);
     }
 
     public int getMinIndexInterval()
@@ -174,7 +182,7 @@ public class IndexSummary extends WrappedSharedCloseable
 
     public int size()
     {
-        return summarySize;
+        return offsetCount;
     }
 
     public int getSamplingLevel()
@@ -192,12 +200,27 @@ public class IndexSummary extends WrappedSharedCloseable
     }
 
     /**
-     * Returns the amount of off-heap memory used for this summary.
+     * Returns the amount of off-heap memory used for the entries portion of this summary.
      * @return size in bytes
      */
-    public long getOffHeapSize()
+    long getEntriesLength()
+    {
+        return entriesLength;
+    }
+
+    Memory getOffsets()
+    {
+        return offsets;
+    }
+
+    Memory getEntries()
+    {
+        return entries;
+    }
+
+    long getOffHeapSize()
     {
-        return bytes.size();
+        return offsetCount * 4 + entriesLength;
     }
 
     /**
@@ -224,14 +247,29 @@ public class IndexSummary extends WrappedSharedCloseable
         public void serialize(IndexSummary t, DataOutputPlus out, boolean withSamplingLevel) throws IOException
         {
             out.writeInt(t.minIndexInterval);
-            out.writeInt(t.summarySize);
-            out.writeLong(t.bytes.size());
+            out.writeInt(t.offsetCount);
+            out.writeLong(t.getOffHeapSize());
             if (withSamplingLevel)
             {
                 out.writeInt(t.samplingLevel);
                 out.writeInt(t.sizeAtFullSampling);
             }
-            out.write(t.bytes);
+            // our on-disk representation treats the offsets and the summary data as one contiguous structure,
+            // in which the offsets are based from the start of the structure. i.e., if the offsets occupy
+            // X bytes, the value of the first offset will be X. In memory we split the two regions up, so that
+            // the summary values are indexed from zero, so we apply a correction to the offsets when de/serializing.
+            // In this case adding X to each of the offsets.
+            int baseOffset = t.offsetCount * 4;
+            for (int i = 0 ; i < t.offsetCount ; i++)
+            {
+                int offset = t.offsets.getInt(i * 4) + baseOffset;
+                // our serialization format for this file uses native byte order, so if this is different to the
+                // default Java serialization order (BIG_ENDIAN) we have to reverse our bytes
+                if (ByteOrder.nativeOrder() != ByteOrder.BIG_ENDIAN)
+                    offset = Integer.reverseBytes(offset);
+                out.writeInt(offset);
+            }
+            out.write(t.entries, 0, t.entriesLength);
         }
 
         public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel, int expectedMinIndexInterval, int maxIndexInterval) throws IOException
@@ -243,7 +281,7 @@ public class IndexSummary extends WrappedSharedCloseable
                                                     minIndexInterval, expectedMinIndexInterval));
             }
 
-            int summarySize = in.readInt();
+            int offsetCount = in.readInt();
             long offheapSize = in.readLong();
             int samplingLevel, fullSamplingSummarySize;
             if (haveSamplingLevel)
@@ -254,7 +292,7 @@ public class IndexSummary extends WrappedSharedCloseable
             else
             {
                 samplingLevel = BASE_SAMPLING_LEVEL;
-                fullSamplingSummarySize = summarySize;
+                fullSamplingSummarySize = offsetCount;
             }
 
             int effectiveIndexInterval = (int) Math.ceil((BASE_SAMPLING_LEVEL / (double) samplingLevel) * minIndexInterval);
@@ -264,9 +302,18 @@ public class IndexSummary extends WrappedSharedCloseable
                                                     " the current max index interval (%d)", effectiveIndexInterval, maxIndexInterval));
             }
 
-            RefCountedMemory memory = new RefCountedMemory(offheapSize);
-            FBUtilities.copy(in, new MemoryOutputStream(memory), offheapSize);
-            return new IndexSummary(partitioner, memory, summarySize, fullSamplingSummarySize, minIndexInterval, samplingLevel);
+            Memory offsets = Memory.allocate(offsetCount * 4);
+            Memory entries = Memory.allocate(offheapSize - offsets.size());
+            FBUtilities.copy(in, new MemoryOutputStream(offsets), offsets.size());
+            FBUtilities.copy(in, new MemoryOutputStream(entries), entries.size());
+            // our on-disk representation treats the offsets and the summary data as one contiguous structure,
+            // in which the offsets are based from the start of the structure. i.e., if the offsets occupy
+            // X bytes, the value of the first offset will be X. In memory we split the two regions up, so that
+            // the summary values are indexed from zero, so we apply a correction to the offsets when de/serializing.
+            // In this case subtracting X from each of the offsets.
+            for (int i = 0 ; i < offsets.size() ; i += 4)
+                offsets.setInt(i, (int) (offsets.getInt(i) - offsets.size()));
+            return new IndexSummary(partitioner, offsets, offsetCount, entries, entries.size(), fullSamplingSummarySize, minIndexInterval, samplingLevel);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index 3b93b31..54e8dd2 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -17,36 +17,33 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.nio.ByteOrder;
 import java.util.Map;
 import java.util.TreeMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.cache.RefCountedMemory;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.Memory;
+import org.apache.cassandra.io.util.SafeMemoryWriter;
 
 import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
-import static org.apache.cassandra.io.sstable.SSTable.getMinimalKey;
 
-public class IndexSummaryBuilder
+public class IndexSummaryBuilder implements AutoCloseable
 {
     private static final Logger logger = LoggerFactory.getLogger(IndexSummaryBuilder.class);
 
-    private final ArrayList<Long> positions;
-    private final ArrayList<DecoratedKey> keys;
+    // the offset in the keys memory region to look for a given summary boundary
+    private final SafeMemoryWriter offsets;
+    private final SafeMemoryWriter entries;
+
     private final int minIndexInterval;
     private final int samplingLevel;
     private final int[] startPoints;
     private long keysWritten = 0;
     private long indexIntervalMatches = 0;
-    private long offheapSize = 0;
     private long nextSamplePosition;
 
     // for each ReadableBoundary, we map its dataLength property to itself, permitting us to lookup the
@@ -75,11 +72,15 @@ public class IndexSummaryBuilder
         final DecoratedKey lastKey;
         final long indexLength;
         final long dataLength;
-        public ReadableBoundary(DecoratedKey lastKey, long indexLength, long dataLength)
+        final int summaryCount;
+        final long entriesLength;
+        public ReadableBoundary(DecoratedKey lastKey, long indexLength, long dataLength, int summaryCount, long entriesLength)
         {
             this.lastKey = lastKey;
             this.indexLength = indexLength;
             this.dataLength = dataLength;
+            this.summaryCount = summaryCount;
+            this.entriesLength = entriesLength;
         }
     }
 
@@ -105,10 +106,9 @@ public class IndexSummaryBuilder
         }
 
         // for initializing data structures, adjust our estimates based on the sampling level
-        maxExpectedEntries = (maxExpectedEntries * samplingLevel) / BASE_SAMPLING_LEVEL;
-        positions = new ArrayList<>((int)maxExpectedEntries);
-        keys = new ArrayList<>((int)maxExpectedEntries);
-        // if we're downsampling we may not use index 0
+        maxExpectedEntries = Math.max(1, (maxExpectedEntries * samplingLevel) / BASE_SAMPLING_LEVEL);
+        offsets = new SafeMemoryWriter(4 * maxExpectedEntries).withByteOrder(ByteOrder.nativeOrder());
+        entries = new SafeMemoryWriter(40 * maxExpectedEntries).withByteOrder(ByteOrder.nativeOrder());
         setNextSamplePosition(-minIndexInterval);
     }
 
@@ -165,16 +165,16 @@ public class IndexSummaryBuilder
     {
         if (keysWritten == nextSamplePosition)
         {
-            keys.add(getMinimalKey(decoratedKey));
-            offheapSize += decoratedKey.getKey().remaining();
-            positions.add(indexStart);
-            offheapSize += TypeSizes.NATIVE.sizeof(indexStart);
+            assert entries.length() <= Integer.MAX_VALUE;
+            offsets.writeInt((int) entries.length());
+            entries.write(decoratedKey.getKey());
+            entries.writeLong(indexStart);
             setNextSamplePosition(keysWritten);
         }
         else if (dataEnd != 0 && keysWritten + 1 == nextSamplePosition)
         {
             // this is the last key in this summary interval, so stash it
-            ReadableBoundary boundary = new ReadableBoundary(decoratedKey, indexEnd, dataEnd);
+            ReadableBoundary boundary = new ReadableBoundary(decoratedKey, indexEnd, dataEnd, (int)(offsets.length() / 4), entries.length());
             lastReadableByData.put(dataEnd, boundary);
             lastReadableByIndex.put(indexEnd, boundary);
         }
@@ -201,52 +201,39 @@ public class IndexSummaryBuilder
 
     public IndexSummary build(IPartitioner partitioner)
     {
+        // this method should only be called when we've finished appending records, so we truncate the
+        // memory we're using to the exact amount required to represent it before building our summary
+        entries.setCapacity(entries.length());
+        offsets.setCapacity(offsets.length());
         return build(partitioner, null);
     }
 
-    // lastIntervalKey should come from getLastReadableBoundary().lastKey
-    public IndexSummary build(IPartitioner partitioner, DecoratedKey lastIntervalKey)
+    // build the summary up to the provided boundary; this is backed by shared memory between
+    // multiple invocations of this build method
+    public IndexSummary build(IPartitioner partitioner, ReadableBoundary boundary)
     {
-        assert keys.size() > 0;
-        assert keys.size() == positions.size();
-
-        int length;
-        if (lastIntervalKey == null)
-            length = keys.size();
-        else // since it's an inclusive upper bound, this should never match exactly
-            length = -1 -Collections.binarySearch(keys, lastIntervalKey);
-
-        assert length > 0;
-
-        long offheapSize = this.offheapSize;
-        if (length < keys.size())
-            for (int i = length ; i < keys.size() ; i++)
-                offheapSize -= keys.get(i).getKey().remaining() + TypeSizes.NATIVE.sizeof(positions.get(i));
-
-        // first we write out the position in the *summary* for each key in the summary,
-        // then we write out (key, actual index position) pairs
-        Memory memory = Memory.allocate(offheapSize + (length * 4));
-        int idxPosition = 0;
-        int keyPosition = length * 4;
-        for (int i = 0; i < length; i++)
+        assert entries.length() > 0;
+
+        int count = (int) (offsets.length() / 4);
+        long entriesLength = entries.length();
+        if (boundary != null)
         {
-            // write the position of the actual entry in the index summary (4 bytes)
-            memory.setInt(idxPosition, keyPosition);
-            idxPosition += TypeSizes.NATIVE.sizeof(keyPosition);
-
-            // write the key
-            ByteBuffer keyBytes = keys.get(i).getKey();
-            memory.setBytes(keyPosition, keyBytes);
-            keyPosition += keyBytes.remaining();
-
-            // write the position in the actual index file
-            long actualIndexPosition = positions.get(i);
-            memory.setLong(keyPosition, actualIndexPosition);
-            keyPosition += TypeSizes.NATIVE.sizeof(actualIndexPosition);
+            count = boundary.summaryCount;
+            entriesLength = boundary.entriesLength;
         }
-        assert keyPosition == offheapSize + (length * 4);
+
         int sizeAtFullSampling = (int) Math.ceil(keysWritten / (double) minIndexInterval);
-        return new IndexSummary(partitioner, memory, length, sizeAtFullSampling, minIndexInterval, samplingLevel);
+        assert count > 0;
+        return new IndexSummary(partitioner, offsets.currentBuffer().sharedCopy(),
+                                count, entries.currentBuffer().sharedCopy(), entriesLength,
+                                sizeAtFullSampling, minIndexInterval, samplingLevel);
+    }
+
+    // close the builder and release any associated memory
+    public void close()
+    {
+        entries.close();
+        offsets.close();
     }
 
     public static int entriesAtSamplingLevel(int samplingLevel, int maxSummarySize)
@@ -294,26 +281,25 @@ public class IndexSummaryBuilder
         int[] startPoints = Downsampling.getStartPoints(currentSamplingLevel, newSamplingLevel);
 
         // calculate new off-heap size
-        int removedKeyCount = 0;
-        long newOffHeapSize = existing.getOffHeapSize();
+        int newKeyCount = existing.size();
+        long newEntriesLength = existing.getEntriesLength();
         for (int start : startPoints)
         {
             for (int j = start; j < existing.size(); j += currentSamplingLevel)
             {
-                removedKeyCount++;
-                newOffHeapSize -= existing.getEntry(j).length;
+                newKeyCount--;
+                long length = existing.getEndInSummary(j) - existing.getPositionInSummary(j);
+                newEntriesLength -= length;
             }
         }
 
-        int newKeyCount = existing.size() - removedKeyCount;
-
-        // Subtract (removedKeyCount * 4) from the new size to account for fewer entries in the first section, which
-        // stores the position of the actual entries in the summary.
-        RefCountedMemory memory = new RefCountedMemory(newOffHeapSize - (removedKeyCount * 4));
+        Memory oldEntries = existing.getEntries();
+        Memory newOffsets = Memory.allocate(newKeyCount * 4);
+        Memory newEntries = Memory.allocate(newEntriesLength);
 
         // Copy old entries to our new Memory.
-        int idxPosition = 0;
-        int keyPosition = newKeyCount * 4;
+        int i = 0;
+        int newEntriesOffset = 0;
         outer:
         for (int oldSummaryIndex = 0; oldSummaryIndex < existing.size(); oldSummaryIndex++)
         {
@@ -326,15 +312,15 @@ public class IndexSummaryBuilder
             }
 
             // write the position of the actual entry in the index summary (4 bytes)
-            memory.setInt(idxPosition, keyPosition);
-            idxPosition += TypeSizes.NATIVE.sizeof(keyPosition);
-
-            // write the entry itself
-            byte[] entry = existing.getEntry(oldSummaryIndex);
-            memory.setBytes(keyPosition, entry, 0, entry.length);
-            keyPosition += entry.length;
+            newOffsets.setInt(i * 4, newEntriesOffset);
+            i++;
+            long start = existing.getPositionInSummary(oldSummaryIndex);
+            long length = existing.getEndInSummary(oldSummaryIndex) - start;
+            newEntries.put(newEntriesOffset, oldEntries, start, length);
+            newEntriesOffset += length;
         }
-        return new IndexSummary(partitioner, memory, newKeyCount, existing.getMaxNumberOfEntries(),
-                                minIndexInterval, newSamplingLevel);
+        assert newEntriesOffset == newEntriesLength;
+        return new IndexSummary(partitioner, newOffsets, newKeyCount, newEntries, newEntriesLength,
+                                existing.getMaxNumberOfEntries(), minIndexInterval, newSamplingLevel);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 973b0c9..41e4adb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -736,40 +736,40 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
             long indexSize = primaryIndex.length();
             long histogramCount = sstableMetadata.estimatedRowSize.count();
             long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
-                               ? histogramCount
-                               : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
+                                 ? histogramCount
+                                 : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
 
-            if (recreateBloomFilter)
-                bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true);
-
-            IndexSummaryBuilder summaryBuilder = null;
-            if (!summaryLoaded)
-                summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel);
-
-            long indexPosition;
-            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+            try(IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel))
             {
-                ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
-                RowIndexEntry indexEntry = metadata.comparator.rowIndexEntrySerializer().deserialize(primaryIndex, descriptor.version);
-                DecoratedKey decoratedKey = partitioner.decorateKey(key);
-                if (first == null)
-                    first = decoratedKey;
-                last = decoratedKey;
 
                 if (recreateBloomFilter)
-                    bf.add(decoratedKey.getKey());
+                    bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true);
 
-                // if summary was already read from disk we don't want to re-populate it using primary index
-                if (!summaryLoaded)
+                long indexPosition;
+                while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
                 {
-                    summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
-                    ibuilder.addPotentialBoundary(indexPosition);
-                    dbuilder.addPotentialBoundary(indexEntry.position);
+                    ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
+                    RowIndexEntry indexEntry = metadata.comparator.rowIndexEntrySerializer().deserialize(primaryIndex, descriptor.version);
+                    DecoratedKey decoratedKey = partitioner.decorateKey(key);
+                    if (first == null)
+                        first = decoratedKey;
+                    last = decoratedKey;
+
+                    if (recreateBloomFilter)
+                        bf.add(decoratedKey.getKey());
+
+                    // if summary was already read from disk we don't want to re-populate it using primary index
+                    if (!summaryLoaded)
+                    {
+                        summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
+                        ibuilder.addPotentialBoundary(indexPosition);
+                        dbuilder.addPotentialBoundary(indexEntry.position);
+                    }
                 }
-            }
 
-            if (!summaryLoaded)
-                indexSummary = summaryBuilder.build(partitioner);
+                if (!summaryLoaded)
+                    indexSummary = summaryBuilder.build(partitioner);
+            }
         }
         finally
         {
@@ -1004,16 +1004,17 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
         try
         {
             long indexSize = primaryIndex.length();
-            IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel);
-
-            long indexPosition;
-            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+            try (IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel))
             {
-                summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
-                RowIndexEntry.Serializer.skip(primaryIndex);
-            }
+                long indexPosition;
+                while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
+                {
+                    summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
+                    RowIndexEntry.Serializer.skip(primaryIndex);
+                }
 
-            return summaryBuilder.build(partitioner);
+                return summaryBuilder.build(partitioner);
+            }
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index b67685d..b35b652 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -403,7 +403,7 @@ public class SSTableWriter extends SSTable
         SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
                                                            components, metadata,
                                                            partitioner, ifile,
-                                                           dfile, iwriter.summary.build(partitioner, boundary.lastKey),
+                                                           dfile, iwriter.summary.build(partitioner, boundary),
                                                            iwriter.bf.sharedCopy(), maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
 
         // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
@@ -470,6 +470,7 @@ public class SSTableWriter extends SSTable
         if (finishType.isFinal)
         {
             iwriter.bf.close();
+            iwriter.summary.close();
             // try to save the summaries to disk
             sstable.saveSummary(iwriter.builder, dbuilder);
             iwriter = null;
@@ -627,6 +628,7 @@ public class SSTableWriter extends SSTable
 
         public void abort()
         {
+            summary.close();
             indexFile.abort();
             bf.close();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java b/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
index 3e38293..8f4bed8 100644
--- a/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
+++ b/src/java/org/apache/cassandra/io/util/AbstractDataOutput.java
@@ -321,9 +321,9 @@ public abstract class AbstractDataOutput extends OutputStream implements DataOut
         }
     }
 
-    public void write(Memory memory) throws IOException
+    public void write(Memory memory, long offset, long length) throws IOException
     {
-        for (ByteBuffer buffer : memory.asByteBuffers())
+        for (ByteBuffer buffer : memory.asByteBuffers(offset, length))
             write(buffer);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
index 36c25ee..c2901e1 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
@@ -27,6 +27,5 @@ public interface DataOutputPlus extends DataOutput
     // write the buffer without modifying its position
     void write(ByteBuffer buffer) throws IOException;
 
-    void write(Memory memory) throws IOException;
-
+    void write(Memory memory, long offset, long length) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index ea78840..dcb9de6 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -25,6 +25,7 @@ import com.sun.jna.Native;
 import net.nicoulaj.compilecommand.annotations.Inline;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.FastByteOperations;
+import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.memory.MemoryUtil;
 import sun.misc.Unsafe;
 import sun.nio.ch.DirectBuffer;
@@ -77,6 +78,9 @@ public class Memory implements AutoCloseable
         if (bytes < 0)
             throw new IllegalArgumentException();
 
+        if (Ref.DEBUG_ENABLED)
+            return new SafeMemory(bytes);
+
         return new Memory(bytes);
     }
 
@@ -163,6 +167,33 @@ public class Memory implements AutoCloseable
         }
     }
 
+    public void setShort(long offset, short l)
+    {
+        checkBounds(offset, offset + 4);
+        if (unaligned)
+        {
+            unsafe.putShort(peer + offset, l);
+        }
+        else
+        {
+            putShortByByte(peer + offset, l);
+        }
+    }
+
+    private void putShortByByte(long address, short value)
+    {
+        if (bigEndian)
+        {
+            unsafe.putByte(address, (byte) (value >> 8));
+            unsafe.putByte(address + 1, (byte) (value));
+        }
+        else
+        {
+            unsafe.putByte(address + 1, (byte) (value >> 8));
+            unsafe.putByte(address, (byte) (value));
+        }
+    }
+
     public void setBytes(long memoryOffset, ByteBuffer buffer)
     {
         if (buffer == null)
@@ -340,20 +371,20 @@ public class Memory implements AutoCloseable
         return false;
     }
 
-    public ByteBuffer[] asByteBuffers()
+    public ByteBuffer[] asByteBuffers(long offset, long length)
     {
         if (size() == 0)
             return new ByteBuffer[0];
 
-        ByteBuffer[] result = new ByteBuffer[(int) (size() / Integer.MAX_VALUE) + 1];
-        long offset = 0;
+        ByteBuffer[] result = new ByteBuffer[(int) (length / Integer.MAX_VALUE) + 1];
         int size = (int) (size() / result.length);
         for (int i = 0 ; i < result.length - 1 ; i++)
         {
             result[i] = MemoryUtil.getByteBuffer(peer + offset, size);
             offset += size;
+            length -= size;
         }
-        result[result.length - 1] = MemoryUtil.getByteBuffer(peer + offset, (int) (size() - offset));
+        result[result.length - 1] = MemoryUtil.getByteBuffer(peer + offset, (int) length);
         return result;
     }
 
@@ -366,5 +397,4 @@ public class Memory implements AutoCloseable
     {
         return String.format("Memory@[%x..%x)", peer, peer + size);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
new file mode 100644
index 0000000..1998cc6
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
@@ -0,0 +1,136 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class SafeMemoryWriter extends AbstractDataOutput implements DataOutputPlus
+{
+    private ByteOrder order = ByteOrder.BIG_ENDIAN;
+    private SafeMemory buffer;
+    private long length;
+
+    public SafeMemoryWriter(long initialCapacity)
+    {
+        buffer = new SafeMemory(initialCapacity);
+    }
+
+    public void write(byte[] buffer, int offset, int count)
+    {
+        long newLength = ensureCapacity(count);
+        this.buffer.setBytes(this.length, buffer, offset, count);
+        this.length = newLength;
+    }
+
+    public void write(int oneByte)
+    {
+        long newLength = ensureCapacity(1);
+        buffer.setByte(length++, (byte) oneByte);
+        length = newLength;
+    }
+
+    public void writeShort(int val) throws IOException
+    {
+        if (order != ByteOrder.nativeOrder())
+            val = Short.reverseBytes((short) val);
+        long newLength = ensureCapacity(2);
+        buffer.setShort(length, (short) val);
+        length = newLength;
+    }
+
+    public void writeInt(int val)
+    {
+        if (order != ByteOrder.nativeOrder())
+            val = Integer.reverseBytes(val);
+        long newLength = ensureCapacity(4);
+        buffer.setInt(length, val);
+        length = newLength;
+    }
+
+    public void writeLong(long val)
+    {
+        if (order != ByteOrder.nativeOrder())
+            val = Long.reverseBytes(val);
+        long newLength = ensureCapacity(8);
+        buffer.setLong(length, val);
+        length = newLength;
+    }
+
+    public void write(ByteBuffer buffer)
+    {
+        long newLength = ensureCapacity(buffer.remaining());
+        this.buffer.setBytes(length, buffer);
+        length = newLength;
+    }
+
+    public void write(Memory memory)
+    {
+        long newLength = ensureCapacity(memory.size());
+        buffer.put(length, memory, 0, memory.size());
+        length = newLength;
+    }
+
+    private long ensureCapacity(long size)
+    {
+        long newLength = this.length + size;
+        if (newLength > buffer.size())
+            setCapacity(Math.max(newLength, buffer.size() + (buffer.size() / 2)));
+        return newLength;
+    }
+
+    public SafeMemory currentBuffer()
+    {
+        return buffer;
+    }
+
+    public void setCapacity(long newCapacity)
+    {
+        if (newCapacity != capacity())
+        {
+            SafeMemory oldBuffer = buffer;
+            buffer = this.buffer.copy(newCapacity);
+            oldBuffer.free();
+        }
+    }
+
+    public void close()
+    {
+        buffer.close();
+    }
+
+    public long length()
+    {
+        return length;
+    }
+
+    public long capacity()
+    {
+        return buffer.size();
+    }
+
+    // TODO: consider hoisting this into DataOutputPlus, since most implementations can copy with this gracefully
+    // this would simplify IndexSummary.IndexSummarySerializer.serialize()
+    public SafeMemoryWriter withByteOrder(ByteOrder order)
+    {
+        this.order = order;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index 8213c46..4e6cef7 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -50,7 +50,7 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
 public final class Ref<T> implements RefCounted<T>, AutoCloseable
 {
     static final Logger logger = LoggerFactory.getLogger(Ref.class);
-    static final boolean DEBUG_ENABLED = System.getProperty("cassandra.debugrefcount", "false").equalsIgnoreCase("true");
+    public static final boolean DEBUG_ENABLED = System.getProperty("cassandra.debugrefcount", "false").equalsIgnoreCase("true");
 
     final State state;
     final T referent;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java b/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
index c656f28..96e226c 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/WrappedSharedCloseable.java
@@ -18,26 +18,34 @@
 */
 package org.apache.cassandra.utils.concurrent;
 
+import java.util.Arrays;
+
 /**
  * An implementation of SharedCloseable that wraps a normal AutoCloseable,
  * ensuring its close method is only called when all instances of SharedCloseable have been
  */
 public abstract class WrappedSharedCloseable extends SharedCloseableImpl
 {
-    final AutoCloseable wrapped;
+    final AutoCloseable[] wrapped;
 
     public WrappedSharedCloseable(final AutoCloseable closeable)
     {
+        this(new AutoCloseable[] { closeable});
+    }
+
+    public WrappedSharedCloseable(final AutoCloseable[] closeable)
+    {
         super(new RefCounted.Tidy()
         {
             public void tidy() throws Exception
             {
-                closeable.close();
+                for (AutoCloseable c : closeable)
+                    c.close();
             }
 
             public String name()
             {
-                return closeable.toString();
+                return Arrays.toString(closeable);
             }
         });
         wrapped = closeable;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index 9aca66d..9c709a3 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -91,38 +91,42 @@ public class IndexSummaryTest
     public void testAddEmptyKey() throws Exception
     {
         IPartitioner p = new RandomPartitioner();
-        IndexSummaryBuilder builder = new IndexSummaryBuilder(1, 1, BASE_SAMPLING_LEVEL);
-        builder.maybeAddEntry(p.decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER), 0);
-        IndexSummary summary = builder.build(p);
-        assertEquals(1, summary.size());
-        assertEquals(0, summary.getPosition(0));
-        assertArrayEquals(new byte[0], summary.getKey(0));
-
-        DataOutputBuffer dos = new DataOutputBuffer();
-        IndexSummary.serializer.serialize(summary, dos, false);
-        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dos.toByteArray()));
-        IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p, false, 1, 1);
-
-        assertEquals(1, loaded.size());
-        assertEquals(summary.getPosition(0), loaded.getPosition(0));
-        assertArrayEquals(summary.getKey(0), summary.getKey(0));
+        try (IndexSummaryBuilder builder = new IndexSummaryBuilder(1, 1, BASE_SAMPLING_LEVEL))
+        {
+            builder.maybeAddEntry(p.decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER), 0);
+            IndexSummary summary = builder.build(p);
+            assertEquals(1, summary.size());
+            assertEquals(0, summary.getPosition(0));
+            assertArrayEquals(new byte[0], summary.getKey(0));
+
+            DataOutputBuffer dos = new DataOutputBuffer();
+            IndexSummary.serializer.serialize(summary, dos, false);
+            DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dos.toByteArray()));
+            IndexSummary loaded = IndexSummary.serializer.deserialize(dis, p, false, 1, 1);
+
+            assertEquals(1, loaded.size());
+            assertEquals(summary.getPosition(0), loaded.getPosition(0));
+            assertArrayEquals(summary.getKey(0), summary.getKey(0));
+        }
     }
 
     private Pair<List<DecoratedKey>, IndexSummary> generateRandomIndex(int size, int interval)
     {
         List<DecoratedKey> list = Lists.newArrayList();
-        IndexSummaryBuilder builder = new IndexSummaryBuilder(list.size(), interval, BASE_SAMPLING_LEVEL);
-        for (int i = 0; i < size; i++)
+        try (IndexSummaryBuilder builder = new IndexSummaryBuilder(list.size(), interval, BASE_SAMPLING_LEVEL))
         {
-            UUID uuid = UUID.randomUUID();
-            DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(uuid));
-            list.add(key);
+            for (int i = 0; i < size; i++)
+            {
+                UUID uuid = UUID.randomUUID();
+                DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(uuid));
+                list.add(key);
+            }
+            Collections.sort(list);
+            for (int i = 0; i < size; i++)
+                builder.maybeAddEntry(list.get(i), i);
+            IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner());
+            return Pair.create(list, summary);
         }
-        Collections.sort(list);
-        for (int i = 0; i < size; i++)
-            builder.maybeAddEntry(list.get(i), i);
-        IndexSummary summary = builder.build(DatabaseDescriptor.getPartitioner());
-        return Pair.create(list, summary);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f3c0e11e/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 76f3304..7110d1d 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -92,6 +92,17 @@ public class DataOutputTest
     }
 
     @Test
+    public void testSafeMemoryWriter() throws IOException
+    {
+        SafeMemoryWriter write = new SafeMemoryWriter(10);
+        DataInput canon = testWrite(write);
+        byte[] bytes = new byte[345];
+        write.currentBuffer().getBytes(0, bytes, 0, 345);
+        DataInput test = new DataInputStream(new ByteArrayInputStream(bytes));
+        testRead(test, canon);
+    }
+
+    @Test
     public void testFileOutputStream() throws IOException
     {
         File file = FileUtils.createTempFile("dataoutput", "test");


[4/4] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
	src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
	test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e473ce06
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e473ce06
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e473ce06

Branch: refs/heads/trunk
Commit: e473ce066129365fd6a5fe8ff3ffb8f292d7788d
Parents: 20b62de f3c0e11
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Mar 4 15:18:00 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Mar 4 15:18:00 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../cassandra/io/sstable/IndexSummary.java      | 113 ++++++++++----
 .../io/sstable/IndexSummaryBuilder.java         | 150 +++++++++----------
 .../io/sstable/format/SSTableReader.java        |  66 ++++----
 .../io/sstable/format/big/BigTableWriter.java   |   4 +-
 .../cassandra/io/util/AbstractDataOutput.java   |   4 +-
 .../cassandra/io/util/DataOutputPlus.java       |   3 +-
 .../org/apache/cassandra/io/util/Memory.java    |  40 ++++-
 .../cassandra/io/util/SafeMemoryWriter.java     | 136 +++++++++++++++++
 .../apache/cassandra/utils/concurrent/Ref.java  |   2 +-
 .../concurrent/WrappedSharedCloseable.java      |  14 +-
 .../cassandra/io/sstable/IndexSummaryTest.java  |  56 +++----
 .../cassandra/io/util/DataOutputTest.java       |  11 ++
 13 files changed, 413 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e473ce06/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e473ce06/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 7953d98,0000000..9b32933
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -1,2058 -1,0 +1,2058 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format;
 +
 +import java.io.*;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Predicate;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.Ordering;
 +import com.google.common.primitives.Longs;
 +import com.google.common.util.concurrent.RateLimiter;
 +
 +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 +import com.clearspring.analytics.stream.cardinality.ICardinality;
 +import org.apache.cassandra.cache.CachingOptions;
 +import org.apache.cassandra.cache.InstrumentingCache;
 +import org.apache.cassandra.cache.KeyCacheKey;
 +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.ScheduledExecutors;
 +import org.apache.cassandra.config.*;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.db.commitlog.ReplayPosition;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.db.index.SecondaryIndex;
 +import org.apache.cassandra.dht.*;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.metadata.*;
 +import org.apache.cassandra.io.util.*;
 +import org.apache.cassandra.metrics.RestorableMeter;
 +import org.apache.cassandra.metrics.StorageMetrics;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.CacheService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.apache.cassandra.utils.concurrent.Ref;
 +import org.apache.cassandra.utils.concurrent.RefCounted;
 +import org.apache.cassandra.utils.concurrent.SelfRefCounted;
 +
 +import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
 +
 +/**
 + * An SSTableReader can be constructed in a number of places, but typically is either
 + * read from disk at startup, or constructed from a flushed memtable, or after compaction
 + * to replace some existing sstables. However once created, an sstablereader may also be modified.
 + *
 + * A reader's OpenReason describes its current stage in its lifecycle, as follows:
 + *
 + * NORMAL
 + * From:       None        => Reader has been read from disk, either at startup or from a flushed memtable
 + *             EARLY       => Reader is the final result of a compaction
 + *             MOVED_START => Reader WAS being compacted, but this failed and it has been restored to NORMAL status
 + *
 + * EARLY
 + * From:       None        => Reader is a compaction replacement that is either incomplete and has been opened
 + *                            to represent its partial result status, or has been finished but the compaction
 + *                            it is a part of has not yet completed fully
 + *             EARLY       => Same as from None, only it is not the first time it has been
 + *
 + * MOVED_START
 + * From:       NORMAL      => Reader is being compacted. This compaction has not finished, but the compaction result
 + *                            is either partially or fully opened, to either partially or fully replace this reader.
 + *                            This reader's start key has been updated to represent this, so that reads only hit
 + *                            one or the other reader.
 + *
 + * METADATA_CHANGE
 + * From:       NORMAL      => Reader has seen low traffic and the amount of memory available for index summaries is
 + *                            constrained, so its index summary has been downsampled.
 + *         METADATA_CHANGE => Same
 + *
 + * Note that in parallel to this, there are two different Descriptor types; TMPLINK and FINAL; the latter corresponds
 + * to NORMAL state readers and all readers that replace a NORMAL one. TMPLINK is used for EARLY state readers and
 + * no others.
 + *
 + * When a reader is being compacted, if the result is large its replacement may be opened as EARLY before compaction
 + * completes in order to present the result to consumers earlier. In this case the reader will itself be changed to
 + * a MOVED_START state, where its start no longer represents its on-disk minimum key. This is to permit reads to be
 + * directed to only one reader when the two represent the same data. The EARLY file can represent a compaction result
 + * that is either partially complete and still in-progress, or a complete and immutable sstable that is part of a larger
 + * macro compaction action that has not yet fully completed.
 + *
 + * Currently ALL compaction results at least briefly go through an EARLY open state prior to completion, regardless
 + * of if early opening is enabled.
 + *
 + * Since a reader can be created multiple times over the same shared underlying resources, and the exact resources
 + * it shares between each instance differ subtly, we track the lifetime of any underlying resource with its own
 + * reference count, which each instance takes a Ref to. Each instance then tracks references to itself, and once these
 + * all expire it releases its Refs to these underlying resources.
 + *
 + * There is some shared cleanup behaviour needed only once all sstablereaders in a certain stage of their lifecycle
 + * (i.e. EARLY or NORMAL opening), and some that must only occur once all readers of any kind over a single logical
 + * sstable have expired. These are managed by the TypeTidy and GlobalTidy classes at the bottom, and are effectively
 + * managed as another resource each instance tracks its own Ref instance to, to ensure all of these resources are
 + * cleaned up safely and can be debugged otherwise.
 + *
 + * TODO: fill in details about DataTracker and lifecycle interactions for tools, and for compaction strategies
 + */
 +public abstract class SSTableReader extends SSTable implements SelfRefCounted<SSTableReader>
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
 +
 +    private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1);
 +    private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0);
 +
 +    public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            long ts1 = o1.getMaxTimestamp();
 +            long ts2 = o2.getMaxTimestamp();
 +            return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1));
 +        }
 +    };
 +
 +    public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            return o1.first.compareTo(o2.first);
 +        }
 +    };
 +
 +    public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
 +
 +    /**
 +     * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound
 +     * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
 +     * later than maxDataAge.
 +     *
 +     * The field is not serialized to disk, so relying on it for more than what truncate does is not advised.
 +     *
 +     * When a new sstable is flushed, maxDataAge is set to the time of creation.
 +     * When a sstable is created from compaction, maxDataAge is set to max of all merged sstables.
 +     *
 +     * The age is in milliseconds since epoc and is local to this host.
 +     */
 +    public final long maxDataAge;
 +
 +    public enum OpenReason
 +    {
 +        NORMAL,
 +        EARLY,
 +        METADATA_CHANGE,
 +        MOVED_START,
 +        SHADOWED // => MOVED_START past end
 +    }
 +
 +    public final OpenReason openReason;
 +
 +    // indexfile and datafile: might be null before a call to load()
 +    protected SegmentedFile ifile;
 +    protected SegmentedFile dfile;
 +    protected IndexSummary indexSummary;
 +    protected IFilter bf;
 +
 +    protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 +
 +    protected InstrumentingCache<KeyCacheKey, RowIndexEntry> keyCache;
 +
 +    protected final BloomFilterTracker bloomFilterTracker = new BloomFilterTracker();
 +
 +    // technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted,
 +    // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
 +    protected final AtomicBoolean isSuspect = new AtomicBoolean(false);
 +
 +    // not final since we need to be able to change level on a file.
 +    protected volatile StatsMetadata sstableMetadata;
 +
 +    protected final AtomicLong keyCacheHit = new AtomicLong(0);
 +    protected final AtomicLong keyCacheRequest = new AtomicLong(0);
 +
 +    private final InstanceTidier tidy = new InstanceTidier(descriptor, metadata);
 +    private final Ref<SSTableReader> selfRef = new Ref<>(this, tidy);
 +
 +    private RestorableMeter readMeter;
 +
 +    /**
 +     * Calculate approximate key count.
 +     * If cardinality estimator is available on all given sstables, then this method use them to estimate
 +     * key count.
 +     * If not, then this uses index summaries.
 +     *
 +     * @param sstables SSTables to calculate key count
 +     * @return estimated key count
 +     */
 +    public static long getApproximateKeyCount(Collection<SSTableReader> sstables)
 +    {
 +        long count = -1;
 +
 +        // check if cardinality estimator is available for all SSTables
 +        boolean cardinalityAvailable = !sstables.isEmpty() && Iterators.all(sstables.iterator(), new Predicate<SSTableReader>()
 +        {
 +            public boolean apply(SSTableReader sstable)
 +            {
 +                return sstable.descriptor.version.hasNewStatsFile();
 +            }
 +        });
 +
 +        // if it is, load them to estimate key count
 +        if (cardinalityAvailable)
 +        {
 +            boolean failed = false;
 +            ICardinality cardinality = null;
 +            for (SSTableReader sstable : sstables)
 +            {
 +                try
 +                {
 +                    CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION);
 +                    assert metadata != null : sstable.getFilename();
 +                    if (cardinality == null)
 +                        cardinality = metadata.cardinalityEstimator;
 +                    else
 +                        cardinality = cardinality.merge(metadata.cardinalityEstimator);
 +                }
 +                catch (IOException e)
 +                {
 +                    logger.warn("Reading cardinality from Statistics.db failed.", e);
 +                    failed = true;
 +                    break;
 +                }
 +                catch (CardinalityMergeException e)
 +                {
 +                    logger.warn("Cardinality merge failed.", e);
 +                    failed = true;
 +                    break;
 +                }
 +            }
 +            if (cardinality != null && !failed)
 +                count = cardinality.cardinality();
 +        }
 +
 +        // if something went wrong above or cardinality is not available, calculate using index summary
 +        if (count < 0)
 +        {
 +            for (SSTableReader sstable : sstables)
 +                count += sstable.estimatedKeys();
 +        }
 +        return count;
 +    }
 +
 +    /**
 +     * Estimates how much of the keys we would keep if the sstables were compacted together
 +     */
 +    public static double estimateCompactionGain(Set<SSTableReader> overlapping)
 +    {
 +        Set<ICardinality> cardinalities = new HashSet<>(overlapping.size());
 +        for (SSTableReader sstable : overlapping)
 +        {
 +            try
 +            {
 +                ICardinality cardinality = ((CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION)).cardinalityEstimator;
 +                if (cardinality != null)
 +                    cardinalities.add(cardinality);
 +                else
 +                    logger.debug("Got a null cardinality estimator in: "+sstable.getFilename());
 +            }
 +            catch (IOException e)
 +            {
 +                logger.warn("Could not read up compaction metadata for " + sstable, e);
 +            }
 +        }
 +        long totalKeyCountBefore = 0;
 +        for (ICardinality cardinality : cardinalities)
 +        {
 +            totalKeyCountBefore += cardinality.cardinality();
 +        }
 +        if (totalKeyCountBefore == 0)
 +            return 1;
 +
 +        long totalKeyCountAfter = mergeCardinalities(cardinalities).cardinality();
 +        logger.debug("Estimated compaction gain: {}/{}={}", totalKeyCountAfter, totalKeyCountBefore, ((double)totalKeyCountAfter)/totalKeyCountBefore);
 +        return ((double)totalKeyCountAfter)/totalKeyCountBefore;
 +    }
 +
 +    private static ICardinality mergeCardinalities(Collection<ICardinality> cardinalities)
 +    {
 +        ICardinality base = new HyperLogLogPlus(13, 25); // see MetadataCollector.cardinality
 +        try
 +        {
 +            base = base.merge(cardinalities.toArray(new ICardinality[cardinalities.size()]));
 +        }
 +        catch (CardinalityMergeException e)
 +        {
 +            logger.warn("Could not merge cardinalities", e);
 +        }
 +        return base;
 +    }
 +
 +    public static SSTableReader open(Descriptor descriptor) throws IOException
 +    {
 +        CFMetaData metadata;
 +        if (descriptor.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR))
 +        {
 +            int i = descriptor.cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
 +            String parentName = descriptor.cfname.substring(0, i);
 +            CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName);
 +            ColumnDefinition def = parent.getColumnDefinitionForIndex(descriptor.cfname.substring(i + 1));
 +            metadata = CFMetaData.newIndexMetadata(parent, def, SecondaryIndex.getIndexComparator(parent, def));
 +        }
 +        else
 +        {
 +            metadata = Schema.instance.getCFMetaData(descriptor.ksname, descriptor.cfname);
 +        }
 +        return open(descriptor, metadata);
 +    }
 +
 +    public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException
 +    {
 +        IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR)
 +                ? new LocalPartitioner(metadata.getKeyValidator())
 +                : StorageService.getPartitioner();
 +        return open(desc, componentsFor(desc), metadata, p);
 +    }
 +
 +    public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 +    {
 +        return open(descriptor, components, metadata, partitioner, true);
 +    }
 +
 +    public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
 +    {
 +        return open(descriptor, components, metadata, StorageService.getPartitioner(), false);
 +    }
 +
 +    /**
 +     * Open SSTable reader to be used in batch mode(such as sstableloader).
 +     *
 +     * @param descriptor
 +     * @param components
 +     * @param metadata
 +     * @param partitioner
 +     * @return opened SSTableReader
 +     * @throws IOException
 +     */
 +    public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 +    {
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
 +        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
 +                EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable or no stats available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
 +                    descriptor, validationMetadata.partitioner, partitionerName));
 +            System.exit(1);
 +        }
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
 +        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
 +                statsMetadata, OpenReason.NORMAL);
 +
 +        // special implementation of load to use non-pooled SegmentedFile builders
 +        SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
 +        SegmentedFile.Builder dbuilder = sstable.compression
 +                ? new CompressedSegmentedFile.Builder(null)
 +                : new BufferedSegmentedFile.Builder();
 +        if (!sstable.loadSummary(ibuilder, dbuilder))
 +            sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
 +        sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
 +        sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
 +        sstable.bf = FilterFactory.AlwaysPresent;
 +        sstable.setup();
 +        return sstable;
 +    }
 +
 +    private static SSTableReader open(Descriptor descriptor,
 +                                      Set<Component> components,
 +                                      CFMetaData metadata,
 +                                      IPartitioner partitioner,
 +                                      boolean validate) throws IOException
 +    {
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
 +        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
 +                EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable or no stats available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
 +                    descriptor, validationMetadata.partitioner, partitionerName));
 +            System.exit(1);
 +        }
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
 +        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
 +                statsMetadata, OpenReason.NORMAL);
 +
 +        // load index and filter
 +        long start = System.nanoTime();
 +        sstable.load(validationMetadata);
 +        logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 +
 +        sstable.setup();
 +        if (validate)
 +            sstable.validate();
 +
 +        if (sstable.getKeyCache() != null)
 +            logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
 +
 +        return sstable;
 +    }
 +
 +    public static void logOpenException(Descriptor descriptor, IOException e)
 +    {
 +        if (e instanceof FileNotFoundException)
 +            logger.error("Missing sstable component in {}; skipped because of {}", descriptor, e.getMessage());
 +        else
 +            logger.error("Corrupt sstable {}; skipped", descriptor, e);
 +    }
 +
 +    public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
 +                                                    final CFMetaData metadata,
 +                                                    final IPartitioner partitioner)
 +    {
 +        final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>();
 +
 +        ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors());
 +        for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
 +        {
 +            Runnable runnable = new Runnable()
 +            {
 +                public void run()
 +                {
 +                    SSTableReader sstable;
 +                    try
 +                    {
 +                        sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner);
 +                    }
 +                    catch (IOException ex)
 +                    {
 +                        logger.error("Corrupt sstable {}; skipped", entry, ex);
 +                        return;
 +                    }
 +                    sstables.add(sstable);
 +                }
 +            };
 +            executor.submit(runnable);
 +        }
 +
 +        executor.shutdown();
 +        try
 +        {
 +            executor.awaitTermination(7, TimeUnit.DAYS);
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new AssertionError(e);
 +        }
 +
 +        return sstables;
 +
 +    }
 +
 +    /**
 +     * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
 +     */
 +    public static SSTableReader internalOpen(Descriptor desc,
 +                                      Set<Component> components,
 +                                      CFMetaData metadata,
 +                                      IPartitioner partitioner,
 +                                      SegmentedFile ifile,
 +                                      SegmentedFile dfile,
 +                                      IndexSummary isummary,
 +                                      IFilter bf,
 +                                      long maxDataAge,
 +                                      StatsMetadata sstableMetadata,
 +                                      OpenReason openReason)
 +    {
 +        assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
 +
 +        SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +
 +        reader.bf = bf;
 +        reader.ifile = ifile;
 +        reader.dfile = dfile;
 +        reader.indexSummary = isummary;
 +        reader.setup();
 +
 +        return reader;
 +    }
 +
 +
 +    private static SSTableReader internalOpen(final Descriptor descriptor,
 +                                            Set<Component> components,
 +                                            CFMetaData metadata,
 +                                            IPartitioner partitioner,
 +                                            Long maxDataAge,
 +                                            StatsMetadata sstableMetadata,
 +                                            OpenReason openReason)
 +    {
 +        Factory readerFactory = descriptor.getFormat().getReaderFactory();
 +
 +        return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +    }
 +
 +    protected SSTableReader(final Descriptor desc,
 +                            Set<Component> components,
 +                            CFMetaData metadata,
 +                            IPartitioner partitioner,
 +                            long maxDataAge,
 +                            StatsMetadata sstableMetadata,
 +                            OpenReason openReason)
 +    {
 +        super(desc, components, metadata, partitioner);
 +        this.sstableMetadata = sstableMetadata;
 +        this.maxDataAge = maxDataAge;
 +        this.openReason = openReason;
 +        this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
 +    }
 +
 +    public static long getTotalBytes(Iterable<SSTableReader> sstables)
 +    {
 +        long sum = 0;
 +        for (SSTableReader sstable : sstables)
 +        {
 +            sum += sstable.onDiskLength();
 +        }
 +        return sum;
 +    }
 +
 +    public boolean equals(Object that)
 +    {
 +        return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
 +    }
 +
 +    public int hashCode()
 +    {
 +        return this.descriptor.hashCode();
 +    }
 +
 +    public String getFilename()
 +    {
 +        return dfile.path;
 +    }
 +
 +    public String getIndexFilename()
 +    {
 +        return ifile.path;
 +    }
 +
 +    public void setTrackedBy(DataTracker tracker)
 +    {
 +        tidy.type.deletingTask.setTracker(tracker);
 +        // under normal operation we can do this at any time, but SSTR is also used outside C* proper,
 +        // e.g. by BulkLoader, which does not initialize the cache.  As a kludge, we set up the cache
 +        // here when we know we're being wired into the rest of the server infrastructure.
 +        keyCache = CacheService.instance.keyCache;
 +    }
 +
 +    private void load(ValidationMetadata validation) throws IOException
 +    {
 +        if (metadata.getBloomFilterFpChance() == 1.0)
 +        {
 +            // bf is disabled.
 +            load(false, true);
 +            bf = FilterFactory.AlwaysPresent;
 +        }
 +        else if (!components.contains(Component.FILTER) || validation == null)
 +        {
 +            // bf is enabled, but filter component is missing.
 +            load(true, true);
 +        }
 +        else if (validation.bloomFilterFPChance != metadata.getBloomFilterFpChance())
 +        {
 +            // bf fp chance in sstable metadata and it has changed since compaction.
 +            load(true, true);
 +        }
 +        else
 +        {
 +            // bf is enabled and fp chance matches the currently configured value.
 +            load(false, true);
 +            loadBloomFilter();
 +        }
 +    }
 +
 +    /**
 +     * Load bloom filter from Filter.db file.
 +     *
 +     * @throws IOException
 +     */
 +    private void loadBloomFilter() throws IOException
 +    {
 +        DataInputStream stream = null;
 +        try
 +        {
 +            stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER))));
 +            bf = FilterFactory.deserialize(stream, true);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(stream);
 +        }
 +    }
 +
 +    /**
 +     * Loads ifile, dfile and indexSummary, and optionally recreates the bloom filter.
 +     * @param saveSummaryIfCreated for bulk loading purposes, if the summary was absent and needed to be built, you can
 +     *                             avoid persisting it to disk by setting this to false
 +     */
 +    private void load(boolean recreateBloomFilter, boolean saveSummaryIfCreated) throws IOException
 +    {
 +        SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +        SegmentedFile.Builder dbuilder = compression
 +                ? SegmentedFile.getCompressedBuilder()
 +                : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +
 +        boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
 +        if (recreateBloomFilter || !summaryLoaded)
 +            buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
 +
 +        ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
 +        dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
 +        if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk
 +            saveSummary(ibuilder, dbuilder);
 +    }
 +
 +    /**
 +     * Build index summary(and optionally bloom filter) by reading through Index.db file.
 +     *
 +     * @param recreateBloomFilter true if recreate bloom filter
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @param summaryLoaded true if index summary is already loaded and not need to build again
 +     * @throws IOException
 +     */
 +    private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
 +    {
 +        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +
 +        try
 +        {
 +            long indexSize = primaryIndex.length();
 +            long histogramCount = sstableMetadata.estimatedRowSize.count();
 +            long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
 +                    ? histogramCount
 +                    : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
 +
 +            if (recreateBloomFilter)
 +                bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true);
 +
-             IndexSummaryBuilder summaryBuilder = null;
-             if (!summaryLoaded)
-                 summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel);
- 
-             long indexPosition;
-             RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata);
- 
-             while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
++            try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel))
 +            {
-                 ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
-                 RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex, descriptor.version);
-                 DecoratedKey decoratedKey = partitioner.decorateKey(key);
-                 if (first == null)
-                     first = decoratedKey;
-                 last = decoratedKey;
++                long indexPosition;
++                RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata);
 +
-                 if (recreateBloomFilter)
-                     bf.add(decoratedKey);
- 
-                 // if summary was already read from disk we don't want to re-populate it using primary index
-                 if (!summaryLoaded)
++                while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
 +                {
-                     summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
-                     ibuilder.addPotentialBoundary(indexPosition);
-                     dbuilder.addPotentialBoundary(indexEntry.position);
++                    ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
++                    RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex, descriptor.version);
++                    DecoratedKey decoratedKey = partitioner.decorateKey(key);
++                    if (first == null)
++                        first = decoratedKey;
++                    last = decoratedKey;
++
++                    if (recreateBloomFilter)
++                        bf.add(decoratedKey);
++
++                    // if summary was already read from disk we don't want to re-populate it using primary index
++                    if (!summaryLoaded)
++                    {
++                        summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
++                        ibuilder.addPotentialBoundary(indexPosition);
++                        dbuilder.addPotentialBoundary(indexEntry.position);
++                    }
 +                }
-             }
 +
-             if (!summaryLoaded)
-                 indexSummary = summaryBuilder.build(partitioner);
++                if (!summaryLoaded)
++                    indexSummary = summaryBuilder.build(partitioner);
++            }
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(primaryIndex);
 +        }
 +
 +        first = getMinimalKey(first);
 +        last = getMinimalKey(last);
 +    }
 +
 +    /**
 +     * Load index summary from Summary.db file if it exists.
 +     *
 +     * if loaded index summary has different index interval from current value stored in schema,
 +     * then Summary.db file will be deleted and this returns false to rebuild summary.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @return true if index summary is loaded successfully from Summary.db file.
 +     */
 +    public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (!summariesFile.exists())
 +            return false;
 +
 +        DataInputStream iStream = null;
 +        try
 +        {
 +            iStream = new DataInputStream(new FileInputStream(summariesFile));
 +            indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel(), metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
 +            first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            ibuilder.deserializeBounds(iStream);
 +            dbuilder.deserializeBounds(iStream);
 +        }
 +        catch (IOException e)
 +        {
 +            if (indexSummary != null)
 +                indexSummary.close();
 +            logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
 +            // corrupted; delete it and fall back to creating a new summary
 +            FileUtils.closeQuietly(iStream);
 +            // delete it and fall back to creating a new summary
 +            FileUtils.deleteWithConfirm(summariesFile);
 +            return false;
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(iStream);
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Save index summary to Summary.db file.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     */
 +    public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        saveSummary(ibuilder, dbuilder, indexSummary);
 +    }
 +
 +    private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (summariesFile.exists())
 +            FileUtils.deleteWithConfirm(summariesFile);
 +
 +        DataOutputStreamAndChannel oStream = null;
 +        try
 +        {
 +            oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile));
 +            IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
 +            ByteBufferUtil.writeWithLength(first.getKey(), oStream);
 +            ByteBufferUtil.writeWithLength(last.getKey(), oStream);
 +            ibuilder.serializeBounds(oStream);
 +            dbuilder.serializeBounds(oStream);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.debug("Cannot save SSTable Summary: ", e);
 +
 +            // corrupted hence delete it and let it load it now.
 +            if (summariesFile.exists())
 +                FileUtils.deleteWithConfirm(summariesFile);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(oStream);
 +        }
 +    }
 +
 +    public void setReplacedBy(SSTableReader replacement)
 +    {
 +        synchronized (tidy.global)
 +        {
 +            assert replacement != null;
 +            assert !tidy.isReplaced;
 +            assert tidy.global.live == this;
 +            tidy.isReplaced = true;
 +            tidy.global.live = replacement;
 +        }
 +    }
 +
 +    public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
 +    {
 +        synchronized (tidy.global)
 +        {
 +            assert openReason != OpenReason.EARLY;
 +            SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile.sharedCopy(),
 +                                                          dfile.sharedCopy(), indexSummary.sharedCopy(), bf.sharedCopy(),
 +                                                          maxDataAge, sstableMetadata, OpenReason.MOVED_START);
 +            // TODO: make data/index start accurate for compressed files
 +            // TODO: merge with caller's firstKeyBeyond() work,to save time
 +            if (newStart.compareTo(first) > 0)
 +            {
 +                final long dataStart = getPosition(newStart, Operator.EQ).position;
 +                final long indexStart = getIndexScanPosition(newStart);
 +                this.tidy.runOnClose = new Runnable()
 +                {
 +                    public void run()
 +                    {
 +                        CLibrary.trySkipCache(dfile.path, 0, dataStart);
 +                        CLibrary.trySkipCache(ifile.path, 0, indexStart);
 +                        if (runOnClose != null)
 +                            runOnClose.run();
 +                    }
 +                };
 +            }
 +
 +            replacement.first = newStart;
 +            replacement.last = this.last;
 +            setReplacedBy(replacement);
 +            return replacement;
 +        }
 +    }
 +
 +    public SSTableReader cloneAsShadowed(final Runnable runOnClose)
 +    {
 +        synchronized (tidy.global)
 +        {
 +            assert openReason != OpenReason.EARLY;
 +            this.tidy.runOnClose = new Runnable()
 +            {
 +                public void run()
 +                {
 +                    CLibrary.trySkipCache(dfile.path, 0, 0);
 +                    CLibrary.trySkipCache(ifile.path, 0, 0);
 +                    runOnClose.run();
 +                }
 +            };
 +
 +            SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile.sharedCopy(),
 +                                                          dfile.sharedCopy(), indexSummary.sharedCopy(), bf.sharedCopy(),
 +                                                          maxDataAge, sstableMetadata, OpenReason.SHADOWED);
 +            replacement.first = first;
 +            replacement.last = last;
 +            setReplacedBy(replacement);
 +            return replacement;
 +        }
 +    }
 +
 +    /**
 +     * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
 +     * be built at the target samplingLevel.  This (original) SSTableReader instance will be marked as replaced, have
 +     * its DeletingTask removed, and have its periodic read-meter sync task cancelled.
 +     * @param samplingLevel the desired sampling level for the index summary on the new SSTableReader
 +     * @return a new SSTableReader
 +     * @throws IOException
 +     */
 +    public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
 +    {
 +        synchronized (tidy.global)
 +        {
 +            assert openReason != OpenReason.EARLY;
 +
 +            int minIndexInterval = metadata.getMinIndexInterval();
 +            int maxIndexInterval = metadata.getMaxIndexInterval();
 +            double effectiveInterval = indexSummary.getEffectiveIndexInterval();
 +
 +            IndexSummary newSummary;
 +            long oldSize = bytesOnDisk();
 +
 +            // We have to rebuild the summary from the on-disk primary index in three cases:
 +            // 1. The sampling level went up, so we need to read more entries off disk
 +            // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary
 +            //    at full sampling (and consequently at any other sampling level)
 +            // 3. The max_index_interval was lowered, forcing us to raise the sampling level
 +            if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval)
 +            {
 +                newSummary = buildSummaryAtLevel(samplingLevel);
 +            }
 +            else if (samplingLevel < indexSummary.getSamplingLevel())
 +            {
 +                // we can use the existing index summary to make a smaller one
 +                newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
 +
 +                SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +                SegmentedFile.Builder dbuilder = compression
 +                        ? SegmentedFile.getCompressedBuilder()
 +                        : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +                saveSummary(ibuilder, dbuilder, newSummary);
 +            }
 +            else
 +            {
 +                throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " +
 +                        "no adjustments to min/max_index_interval");
 +            }
 +
 +            long newSize = bytesOnDisk();
 +            StorageMetrics.load.inc(newSize - oldSize);
 +            parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
 +
 +            SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile.sharedCopy(),
 +                                                     dfile.sharedCopy(), newSummary, bf.sharedCopy(), maxDataAge,
 +                                                     sstableMetadata, OpenReason.METADATA_CHANGE);
 +            replacement.first = this.first;
 +            replacement.last = this.last;
 +            setReplacedBy(replacement);
 +            return replacement;
 +        }
 +    }
 +
 +    private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException
 +    {
 +        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +        try
 +        {
 +            long indexSize = primaryIndex.length();
-             IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel);
- 
-             long indexPosition;
-             while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
++            try (IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel))
 +            {
-                 summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
-                 RowIndexEntry.Serializer.skip(primaryIndex);
-             }
++                long indexPosition;
++                while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
++                {
++                    summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
++                    RowIndexEntry.Serializer.skip(primaryIndex);
++                }
 +
-             return summaryBuilder.build(partitioner);
++                return summaryBuilder.build(partitioner);
++            }
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(primaryIndex);
 +        }
 +    }
 +
 +    public RestorableMeter getReadMeter()
 +    {
 +        return readMeter;
 +    }
 +
 +    public int getIndexSummarySamplingLevel()
 +    {
 +        return indexSummary.getSamplingLevel();
 +    }
 +
 +    public long getIndexSummaryOffHeapSize()
 +    {
 +        return indexSummary.getOffHeapSize();
 +    }
 +
 +    public int getMinIndexInterval()
 +    {
 +        return indexSummary.getMinIndexInterval();
 +    }
 +
 +    public double getEffectiveIndexInterval()
 +    {
 +        return indexSummary.getEffectiveIndexInterval();
 +    }
 +
 +    public void releaseSummary()
 +    {
 +        tidy.releaseSummary();
 +        indexSummary = null;
 +    }
 +
 +    private void validate()
 +    {
 +        if (this.first.compareTo(this.last) > 0)
 +        {
 +            selfRef().release();
 +            throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last));
 +        }
 +    }
 +
 +    /**
 +     * Gets the position in the index file to start scanning to find the given key (at most indexInterval keys away,
 +     * modulo downsampling of the index summary). Always returns a value >= 0
 +     */
 +    public long getIndexScanPosition(RowPosition key)
 +    {
 +        if (openReason == OpenReason.MOVED_START && key.compareTo(first) < 0)
 +            key = first;
 +
 +        return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary);
 +    }
 +
 +    protected static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
 +    {
 +        if (binarySearchResult == -1)
 +            return 0;
 +        else
 +            return referencedIndexSummary.getPosition(getIndexSummaryIndexFromBinarySearchResult(binarySearchResult));
 +    }
 +
 +    protected static int getIndexSummaryIndexFromBinarySearchResult(int binarySearchResult)
 +    {
 +        if (binarySearchResult < 0)
 +        {
 +            // binary search gives us the first index _greater_ than the key searched for,
 +            // i.e., its insertion position
 +            int greaterThan = (binarySearchResult + 1) * -1;
 +            if (greaterThan == 0)
 +                return -1;
 +            return greaterThan - 1;
 +        }
 +        else
 +        {
 +            return binarySearchResult;
 +        }
 +    }
 +
 +    /**
 +     * Returns the compression metadata for this sstable.
 +     * @throws IllegalStateException if the sstable is not compressed
 +     */
 +    public CompressionMetadata getCompressionMetadata()
 +    {
 +        if (!compression)
 +            throw new IllegalStateException(this + " is not compressed");
 +
 +        CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata();
 +
 +        //We need the parent cf metadata
 +        String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
 +        cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName));
 +
 +        return cmd;
 +    }
 +
 +    /**
 +     * Returns the amount of memory in bytes used off heap by the compression meta-data.
 +     * @return the amount of memory in bytes used off heap by the compression meta-data
 +     */
 +    public long getCompressionMetadataOffHeapSize()
 +    {
 +        if (!compression)
 +            return 0;
 +
 +        return getCompressionMetadata().offHeapSize();
 +    }
 +
 +    /**
 +     * For testing purposes only.
 +     */
 +    public void forceFilterFailures()
 +    {
 +        bf = FilterFactory.AlwaysPresent;
 +    }
 +
 +    public IFilter getBloomFilter()
 +    {
 +        return bf;
 +    }
 +
 +    public long getBloomFilterSerializedSize()
 +    {
 +        return bf.serializedSize();
 +    }
 +
 +    /**
 +     * Returns the amount of memory in bytes used off heap by the bloom filter.
 +     * @return the amount of memory in bytes used off heap by the bloom filter
 +     */
 +    public long getBloomFilterOffHeapSize()
 +    {
 +        return bf.offHeapSize();
 +    }
 +
 +    /**
 +     * @return An estimate of the number of keys in this SSTable based on the index summary.
 +     */
 +    public long estimatedKeys()
 +    {
 +        return indexSummary.getEstimatedKeyCount();
 +    }
 +
 +    /**
 +     * @param ranges
 +     * @return An estimate of the number of keys for given ranges in this SSTable.
 +     */
 +    public long estimatedKeysForRanges(Collection<Range<Token>> ranges)
 +    {
 +        long sampleKeyCount = 0;
 +        List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges);
 +        for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes)
 +            sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1);
 +
 +        // adjust for the current sampling level: (BSL / SL) * index_interval_at_full_sampling
 +        long estimatedKeys = sampleKeyCount * ((long) Downsampling.BASE_SAMPLING_LEVEL * indexSummary.getMinIndexInterval()) / indexSummary.getSamplingLevel();
 +        return Math.max(1, estimatedKeys);
 +    }
 +
 +    /**
 +     * Returns the number of entries in the IndexSummary.  At full sampling, this is approximately 1/INDEX_INTERVALth of
 +     * the keys in this SSTable.
 +     */
 +    public int getIndexSummarySize()
 +    {
 +        return indexSummary.size();
 +    }
 +
 +    /**
 +     * Returns the approximate number of entries the IndexSummary would contain if it were at full sampling.
 +     */
 +    public int getMaxIndexSummarySize()
 +    {
 +        return indexSummary.getMaxNumberOfEntries();
 +    }
 +
 +    /**
 +     * Returns the key for the index summary entry at `index`.
 +     */
 +    public byte[] getIndexSummaryKey(int index)
 +    {
 +        return indexSummary.getKey(index);
 +    }
 +
 +    private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges)
 +    {
 +        // use the index to determine a minimal section for each range
 +        List<Pair<Integer,Integer>> positions = new ArrayList<>();
 +
 +        for (Range<Token> range : Range.normalize(ranges))
 +        {
 +            RowPosition leftPosition = range.left.maxKeyBound();
 +            RowPosition rightPosition = range.right.maxKeyBound();
 +
 +            int left = summary.binarySearch(leftPosition);
 +            if (left < 0)
 +                left = (left + 1) * -1;
 +            else
 +                // left range are start exclusive
 +                left = left + 1;
 +            if (left == summary.size())
 +                // left is past the end of the sampling
 +                continue;
 +
 +            int right = Range.isWrapAround(range.left, range.right)
 +                    ? summary.size() - 1
 +                    : summary.binarySearch(rightPosition);
 +            if (right < 0)
 +            {
 +                // range are end inclusive so we use the previous index from what binarySearch give us
 +                // since that will be the last index we will return
 +                right = (right + 1) * -1;
 +                if (right == 0)
 +                    // Means the first key is already stricly greater that the right bound
 +                    continue;
 +                right--;
 +            }
 +
 +            if (left > right)
 +                // empty range
 +                continue;
 +            positions.add(Pair.create(left, right));
 +        }
 +        return positions;
 +    }
 +
 +    public Iterable<DecoratedKey> getKeySamples(final Range<Token> range)
 +    {
 +        final List<Pair<Integer, Integer>> indexRanges = getSampleIndexesForRanges(indexSummary, Collections.singletonList(range));
 +
 +        if (indexRanges.isEmpty())
 +            return Collections.emptyList();
 +
 +        return new Iterable<DecoratedKey>()
 +        {
 +            public Iterator<DecoratedKey> iterator()
 +            {
 +                return new Iterator<DecoratedKey>()
 +                {
 +                    private Iterator<Pair<Integer, Integer>> rangeIter = indexRanges.iterator();
 +                    private Pair<Integer, Integer> current;
 +                    private int idx;
 +
 +                    public boolean hasNext()
 +                    {
 +                        if (current == null || idx > current.right)
 +                        {
 +                            if (rangeIter.hasNext())
 +                            {
 +                                current = rangeIter.next();
 +                                idx = current.left;
 +                                return true;
 +                            }
 +                            return false;
 +                        }
 +
 +                        return true;
 +                    }
 +
 +                    public DecoratedKey next()
 +                    {
 +                        byte[] bytes = indexSummary.getKey(idx++);
 +                        return partitioner.decorateKey(ByteBuffer.wrap(bytes));
 +                    }
 +
 +                    public void remove()
 +                    {
 +                        throw new UnsupportedOperationException();
 +                    }
 +                };
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Determine the minimal set of sections that can be extracted from this SSTable to cover the given ranges.
 +     * @return A sorted list of (offset,end) pairs that cover the given ranges in the datafile for this SSTable.
 +     */
 +    public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range<Token>> ranges)
 +    {
 +        // use the index to determine a minimal section for each range
 +        List<Pair<Long,Long>> positions = new ArrayList<>();
 +        for (Range<Token> range : Range.normalize(ranges))
 +        {
 +            assert !range.isWrapAround() || range.right.isMinimum();
 +            // truncate the range so it at most covers the sstable
 +            AbstractBounds<RowPosition> bounds = range.toRowBounds();
 +            RowPosition leftBound = bounds.left.compareTo(first) > 0 ? bounds.left : first.getToken().minKeyBound();
 +            RowPosition rightBound = bounds.right.isMinimum() ? last.getToken().maxKeyBound() : bounds.right;
 +
 +            if (leftBound.compareTo(last) > 0 || rightBound.compareTo(first) < 0)
 +                continue;
 +
 +            long left = getPosition(leftBound, Operator.GT).position;
 +            long right = (rightBound.compareTo(last) > 0)
 +                         ? uncompressedLength()
 +                         : getPosition(rightBound, Operator.GT).position;
 +
 +            if (left == right)
 +                // empty range
 +                continue;
 +
 +            assert left < right : String.format("Range=%s openReason=%s first=%s last=%s left=%d right=%d", range, openReason, first, last, left, right);
 +            positions.add(Pair.create(left, right));
 +        }
 +        return positions;
 +    }
 +
 +    public void invalidateCacheKey(DecoratedKey key)
 +    {
 +        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
 +        keyCache.remove(cacheKey);
 +    }
 +
 +    public void cacheKey(DecoratedKey key, RowIndexEntry info)
 +    {
 +        CachingOptions caching = metadata.getCaching();
 +
 +        if (!caching.keyCache.isEnabled()
 +                || keyCache == null
 +                || keyCache.getCapacity() == 0)
 +        {
 +            return;
 +        }
 +
 +        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
 +        logger.trace("Adding cache entry for {} -> {}", cacheKey, info);
 +        keyCache.put(cacheKey, info);
 +    }
 +
 +    public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
 +    {
 +        return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.getKey()), updateStats);
 +    }
 +
 +    protected RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats)
 +    {
 +        if (keyCache != null && keyCache.getCapacity() > 0) {
 +            if (updateStats)
 +            {
 +                RowIndexEntry cachedEntry = keyCache.get(unifiedKey);
 +                keyCacheRequest.incrementAndGet();
 +                if (cachedEntry != null)
 +                {
 +                    keyCacheHit.incrementAndGet();
 +                    bloomFilterTracker.addTruePositive();
 +                }
 +                return cachedEntry;
 +            }
 +            else
 +            {
 +                return keyCache.getInternal(unifiedKey);
 +            }
 +        }
 +        return null;
 +    }
 +
 +    /**
 +     * Get position updating key cache and stats.
 +     * @see #getPosition(org.apache.cassandra.db.RowPosition, SSTableReader.Operator, boolean)
 +     */
 +    public RowIndexEntry getPosition(RowPosition key, Operator op)
 +    {
 +        return getPosition(key, op, true, false);
 +    }
 +
 +    public RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats)
 +    {
 +        return getPosition(key, op, updateCacheAndStats, false);
 +    }
 +    /**
 +     * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
 +     * allow key selection by token bounds but only if op != * EQ
 +     * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
 +     * @param updateCacheAndStats true if updating stats and cache
 +     * @return The index entry corresponding to the key, or null if the key is not present
 +     */
 +    protected abstract RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast);
 +
 +    //Corresponds to a name column
 +    public abstract OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns);
 +    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry);
 +
 +    //Corresponds to a slice query
 +    public abstract OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse);
 +    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry);
 +
 +    /**
 +     * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
 +     */
 +    public DecoratedKey firstKeyBeyond(RowPosition token)
 +    {
 +        if (token.compareTo(first) < 0)
 +            return first;
 +
 +        long sampledPosition = getIndexScanPosition(token);
 +
 +        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
 +        while (segments.hasNext())
 +        {
 +            FileDataInput in = segments.next();
 +            try
 +            {
 +                while (!in.isEOF())
 +                {
 +                    ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
 +                    DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
 +                    if (indexDecoratedKey.compareTo(token) > 0)
 +                        return indexDecoratedKey;
 +
 +                    RowIndexEntry.Serializer.skip(in);
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                markSuspect();
 +                throw new CorruptSSTableException(e, in.getPath());
 +            }
 +            finally
 +            {
 +                FileUtils.closeQuietly(in);
 +            }
 +        }
 +
 +        return null;
 +    }
 +
 +    /**
 +     * @return The length in bytes of the data for this SSTable. For
 +     * compressed files, this is not the same thing as the on disk size (see
 +     * onDiskLength())
 +     */
 +    public long uncompressedLength()
 +    {
 +        return dfile.length;
 +    }
 +
 +    /**
 +     * @return The length in bytes of the on disk size for this SSTable. For
 +     * compressed files, this is not the same thing as the data length (see
 +     * length())
 +     */
 +    public long onDiskLength()
 +    {
 +        return dfile.onDiskLength;
 +    }
 +
 +    /**
 +     * Mark the sstable as obsolete, i.e., compacted into newer sstables.
 +     *
 +     * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere
 +     * except for threads holding a reference.
 +     *
 +     * @return true if the this is the first time the file was marked obsolete.  Calling this
 +     * multiple times is usually buggy (see exceptions in DataTracker.unmarkCompacting and removeOldSSTablesSize).
 +     */
 +    public boolean markObsolete()
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Marking {} compacted", getFilename());
 +
 +        synchronized (tidy.global)
 +        {
 +            assert !tidy.isReplaced;
 +        }
 +        return !tidy.global.isCompacted.getAndSet(true);
 +    }
 +
 +    public boolean isMarkedCompacted()
 +    {
 +        return tidy.global.isCompacted.get();
 +    }
 +
 +    public void markSuspect()
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Marking {} as a suspect for blacklisting.", getFilename());
 +
 +        isSuspect.getAndSet(true);
 +    }
 +
 +    public boolean isMarkedSuspect()
 +    {
 +        return isSuspect.get();
 +    }
 +
 +
 +    /**
 +     * I/O SSTableScanner
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ISSTableScanner getScanner()
 +    {
 +        return getScanner((RateLimiter) null);
 +    }
 +
 +    public ISSTableScanner getScanner(RateLimiter limiter)
 +    {
 +        return getScanner(DataRange.allData(partitioner), limiter);
 +    }
 +
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ISSTableScanner getScanner(DataRange dataRange)
 +    {
 +        return getScanner(dataRange, null);
 +    }
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined range of tokens.
 +     *
 +     * @param range the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ISSTableScanner getScanner(Range<Token> range, RateLimiter limiter)
 +    {
 +        if (range == null)
 +            return getScanner(limiter);
 +        return getScanner(Collections.singletonList(range), limiter);
 +    }
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
 +     *
 +     * @param ranges the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public abstract ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter);
 +
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public abstract ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter);
 +
 +
 +
 +    public FileDataInput getFileDataInput(long position)
 +    {
 +        return dfile.getSegment(position);
 +    }
 +
 +    /**
 +     * Tests if the sstable contains data newer than the given age param (in localhost currentMilli time).
 +     * This works in conjunction with maxDataAge which is an upper bound on the create of data in this sstable.
 +     * @param age The age to compare the maxDataAre of this sstable. Measured in millisec since epoc on this host
 +     * @return True iff this sstable contains data that's newer than the given age parameter.
 +     */
 +    public boolean newSince(long age)
 +    {
 +        return maxDataAge > age;
 +    }
 +
 +    public void createLinks(String snapshotDirectoryPath)
 +    {
 +        for (Component component : components)
 +        {
 +            File sourceFile = new File(descriptor.filenameFor(component));
 +            File targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
 +            FileUtils.createHardLink(sourceFile, targetLink);
 +        }
 +    }
 +
 +    public boolean isRepaired()
 +    {
 +        return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
 +    }
 +
 +    public SSTableReader getCurrentReplacement()
 +    {
 +        return tidy.global.live;
 +    }
 +
 +    /**
 +     * TODO: Move someplace reusable
 +     */
 +    public abstract static class Operator
 +    {
 +        public static final Operator EQ = new Equals();
 +        public static final Operator GE = new GreaterThanOrEqualTo();
 +        public static final Operator GT = new GreaterThan();
 +
 +        /**
 +         * @param comparison The result of a call to compare/compareTo, with the desired field on the rhs.
 +         * @return less than 0 if the operator cannot match forward, 0 if it matches, greater than 0 if it might match forward.
 +         */
 +        public abstract int apply(int comparison);
 +
 +        final static class Equals extends Operator
 +        {
 +            public int apply(int comparison) { return -comparison; }
 +        }
 +
 +        final static class GreaterThanOrEqualTo extends Operator
 +        {
 +            public int apply(int comparison) { return comparison >= 0 ? 0 : 1; }
 +        }
 +
 +        final static class GreaterThan extends Operator
 +        {
 +            public int apply(int comparison) { return comparison > 0 ? 0 : 1; }
 +        }
 +    }
 +
 +    public long getBloomFilterFalsePositiveCount()
 +    {
 +        return bloomFilterTracker.getFalsePositiveCount();
 +    }
 +
 +    public long getRecentBloomFilterFalsePositiveCount()
 +    {
 +        return bloomFilterTracker.getRecentFalsePositiveCount();
 +    }
 +
 +    public long getBloomFilterTruePositiveCount()
 +    {
 +        return bloomFilterTracker.getTruePositiveCount();
 +    }
 +
 +    public long getRecentBloomFilterTruePositiveCount()
 +    {
 +        return bloomFilterTracker.getRecentTruePositiveCount();
 +    }
 +
 +    public InstrumentingCache<KeyCacheKey, RowIndexEntry> getKeyCache()
 +    {
 +        return keyCache;
 +    }
 +
 +    public EstimatedHistogram getEstimatedRowSize()
 +    {
 +        return sstableMetadata.estimatedRowSize;
 +    }
 +
 +    public EstimatedHistogram getEstimatedColumnCount()
 +    {
 +        return sstableMetadata.estimatedColumnCount;
 +    }
 +
 +    public double getEstimatedDroppableTombstoneRatio(int gcBefore)
 +    {
 +        return sstableMetadata.getEstimatedDroppableTombstoneRatio(gcBefore);
 +    }
 +
 +    public double getDroppableTombstonesBefore(int gcBefore)
 +    {
 +        return sstableMetadata.getDroppableTombstonesBefore(gcBefore);
 +    }
 +
 +    public double getCompressionRatio()
 +    {
 +        return sstableMetadata.compressionRatio;
 +    }
 +
 +    public ReplayPosition getReplayPosition()
 +    {
 +        return sstableMetadata.replayPosition;
 +    }
 +
 +    public long getMinTimestamp()
 +    {
 +        return sstableMetadata.minTimestamp;
 +    }
 +
 +    public long getMaxTimestamp()
 +    {
 +        return sstableMetadata.maxTimestamp;
 +    }
 +
 +    public Set<Integer> getAncestors()
 +    {
 +        try
 +        {
 +            CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION);
 +            return compactionMetadata.ancestors;
 +        }
 +        catch (IOException e)
 +        {
 +            SSTableReader.logOpenException(descriptor, e);
 +            return Collections.emptySet();
 +        }
 +    }
 +
 +    public int getSSTableLevel()
 +    {
 +        return sstableMetadata.sstableLevel;
 +    }
 +
 +    /**
 +     * Reloads the sstable metadata from disk.
 +     *
 +     * Called after level is changed on sstable, for example if the sstable is dropped to L0
 +     *
 +     * Might be possible to remove in future versions
 +     *
 +     * @throws IOException
 +     */
 +    public void reloadSSTableMetadata() throws IOException
 +    {
 +        this.sstableMetadata = (StatsMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.STATS);
 +    }
 +
 +    public StatsMetadata getSSTableMetadata()
 +    {
 +        return sstableMetadata;
 +    }
 +
 +    public RandomAccessReader openDataReader(RateLimiter limiter)
 +    {
 +        assert limiter != null;
 +        return dfile.createThrottledReader(limiter);
 +    }
 +
 +    public RandomAccessReader openDataReader()
 +    {
 +        return dfile.createReader();
 +    }
 +
 +    public RandomAccessReader openIndexReader()
 +    {
 +        return ifile.createReader();
 +    }
 +
 +    /**
 +     * @param component component to get timestamp.
 +     * @return last modified time for given component. 0 if given component does not exist or IO error occurs.
 +     */
 +    public long getCreationTimeFor(Component component)
 +    {
 +        return new File(descriptor.filenameFor(component)).lastModified();
 +    }
 +
 +    /**
 +     * @return Number of key cache hit
 +     */
 +    public long getKeyCacheHit()
 +    {
 +        return keyCacheHit.get();
 +    }
 +
 +    /**
 +     * @return Number of key cache request
 +     */
 +    public long getKeyCacheRequest()
 +    {
 +        return keyCacheRequest.get();
 +    }
 +
 +    /**
 +     * Increment the total row read count and read rate for this SSTable.  This should not be incremented for range
 +     * slice queries, row cache hits, or non-query reads, like compaction.
 +     */
 +    public void incrementReadCount()
 +    {
 +        if (readMeter != null)
 +            readMeter.mark();
 +    }
 +
 +    public static class SizeComparator implements Comparator<SSTableReader>
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
 +        }
 +    }
 +
 +    public Ref<SSTableReader> tryRef()
 +    {
 +        return selfRef.tryRef();
 +    }
 +
 +    public Ref<SSTableReader> selfRef()
 +    {
 +        return selfRef;
 +    }
 +
 +    public Ref<SSTableReader> ref()
 +    {
 +        return selfRef.ref();
 +    }
 +
 +    void setup()
 +    {
 +        tidy.setup(this);
 +        this.readMeter = tidy.global.readMeter;
 +    }
 +
 +    @VisibleForTesting
 +    public void overrideReadMeter(RestorableMeter readMeter)
 +    {
 +        this.readMeter = tidy.global.readMeter = readMeter;
 +    }
 +
 +    /**
 +     * One instance per SSTableReader we create. This references the type-shared tidy, which in turn references
 +     * the globally shared tidy, i.e.
 +     *
 +     * InstanceTidier => DescriptorTypeTitdy => GlobalTidy
 +     *
 +     * We can create many InstanceTidiers (one for every time we reopen an sstable with MOVED_START for example), but there can only be
 +     * two DescriptorTypeTidy (FINAL and TEMPLINK) and only one GlobalTidy for one single logical sstable.
 +     *
 +     * When the InstanceTidier cleansup, it releases its reference to its DescriptorTypeTidy; when all InstanceTidiers
 +     * for that type have run, the DescriptorTypeTidy cleansup. DescriptorTypeTidy behaves in the same way towards GlobalTidy.
 +     *
 +     * For ease, we stash a direct reference to both our type-shared and global tidier
 +     */
 +    private static final class InstanceTidier implements Tidy
 +    {
 +        private final Descriptor descriptor;
 +        private final CFMetaData metadata;
 +        private IFilter bf;
 +        private IndexSummary summary;
 +
 +        private SegmentedFile dfile;
 +        private SegmentedFile ifile;
 +        private Runnable runOnClose;
 +        private boolean isReplaced = false;
 +
 +        // a reference to our shared per-Descriptor.Type tidy instance, that
 +        // we will release when we are ourselves released
 +        private Ref<DescriptorTypeTidy> typeRef;
 +
 +        // a convenience stashing of the shared per-descriptor-type tidy instance itself
 +        // and the per-logical-sstable globally shared state that it is linked to
 +        private DescriptorTypeTidy type;
 +        private GlobalTidy global;
 +
 +        private boolean setup;
 +
 +        void setup(SSTableReader reader)
 +        {
 +            this.setup = true;
 +            this.bf = reader.bf;
 +            this.summary = reader.indexSummary;
 +            this.dfile = reader.dfile;
 +            this.ifile = reader.ifile;
 +            // get a new reference to the shared descriptor-type tidy
 +            this.typeRef = DescriptorTypeTidy.get(reader);
 +            this.type = typeRef.get();
 +            this.global = type.globalRef.get();
 +        }
 +
 +        InstanceTidier(Descriptor descriptor, CFMetaData metadata)
 +        {
 +            this.descriptor = descriptor;
 +            this.metadata = metadata;
 +        }
 +
 +        public void tidy()
 +        {
 +            // don't try to cleanup if the sstablereader was never fully constructed
 +            if (!setup)
 +                return;
 +
 +            final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
 +            final OpOrder.Barrier barrier;
 +            if (cfs != null)
 +            {
 +                barrier = cfs.readOrdering.newBarrier();
 +                barrier.issue();
 +            }
 +            else
 +                barrier = null;
 +
 +            ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
 +            {
 +                public void run()
 +                {
 +                    if (barrier != null)
 +                        barrier.await();
 +                    bf.close();
 +                    dfile.close();
 +                    ifile.close();
 +                    if (summary != null)
 +                        summary.close();
 +                    if (runOnClose != null)
 +                        runOnClose.run();
 +                    typeRef.release();
 +                }
 +            });
 +        }
 +
 +        public String name()
 +        {
 +            return descriptor.toString();
 +        }
 +
 +        void releaseSummary()
 +        {
 +            summary.close();
 +            assert summary.isCleanedUp();
 +            summary = null;
 +        }
 +    }
 +
 +    /**
 +     * One shared between all instances of a given Descriptor.Type.
 +     * Performs only two things: the deletion of the sstables for the type,
 +     * if necessary; and the shared reference to the globally shared state.
 +     *
 +     * All InstanceTidiers, on setup(), ask the static get() method for their shared state,
 +     * and stash a reference to it to be released when they are. Once all such references are
 +     * released, the shared tidy will be performed.
 +     */
 +    static final class DescriptorTypeTidy implements Tidy
 +    {
 +        // keyed by REAL descriptor (TMPLINK/FINAL), mapping to the shared DescriptorTypeTidy for that descriptor
 +        static final ConcurrentMap<Descriptor, Ref<DescriptorTypeTidy>> lookup = new ConcurrentHashMap<>();
 +
 +        private final Descriptor desc;
 +        private final Ref<GlobalTidy> globalRef;
 +        private final SSTableDeletingTask deletingTask;
 +
 +        DescriptorTypeTidy(Descriptor desc, SSTableReader sstable)
 +        {
 +            this.desc = desc;
 +            this.deletingTask = new SSTableDeletingTask(desc, sstable);
 +            // get a new reference to the shared global tidy
 +            this.globalRef = GlobalTidy.get(sstable);
 +        }
 +
 +        public void tidy()
 +        {
 +            lookup.remove(desc);
 +            boolean isCompacted = globalRef.get().isCompacted.get();
 +            globalRef.release();
 +            switch (desc.type)
 +            {
 +                case FINAL:
 +                    if (isCompacted)
 +                        deletingTask.run();
 +                    break;
 +                case TEMPLINK:
 +                    deletingTask.run();
 +                    break;
 +                default:
 +                    throw new IllegalStateException();
 +            }
 +        }
 +
 +        public String name()
 +        {
 +            return desc.toString();
 +        }
 +
 +        // get a new reference to the shared DescriptorTypeTidy for this sstable
 +        public static Ref<DescriptorTypeTidy> get(SSTableReader sstable)
 +        {
 +            Descriptor desc = sstable.descriptor;
 +            if (sstable.openReason == OpenReason.EARLY)
 +                desc = desc.asType(Descriptor.Type.TEMPLINK);
 +            Ref<DescriptorTypeTidy> refc = lookup.get(desc);
 +            if (refc != null)
 +                return refc.ref();
 +            final DescriptorTypeTidy tidy = new DescriptorTypeTidy(desc, sstable);
 +            refc = new Ref<>(tidy, tidy);
 +            Ref<?> ex = lookup.putIfAbsent(desc, refc);
 +            assert ex == null;
 +            return refc;
 +        }
 +    }
 +
 +    /**
 +     * One instance per logical sstable. This both tracks shared cleanup and some shared state related
 +     * to the sstable's lifecycle. All DescriptorTypeTidy instances, on construction, obtain a reference to us
 +     * via our static get(). There should only ever be at most two such references extant at any one time,
 +     * since only TMPLINK and FINAL type descriptors should be open as readers. When all files of both
 +     * kinds have been released, this shared tidy will be performed.
 +     */
 +    static final class GlobalTidy implements Tidy
 +    {
 +        // keyed by FINAL descriptor, mapping to the shared GlobalTidy for that descriptor
 +        static final ConcurrentMap<Descriptor, Ref<GlobalTidy>> lookup = new ConcurrentHashMap<>();
 +
 +        private final Descriptor desc;
 +        // a single convenience property for getting the most recent version of an sstable, not related to tidying
 +        private SSTableReader live;
 +        // the readMeter that is shared between all instances of the sstable, and can be overridden in all of them
 +        // at once also, for testing purposes
 +        private RestorableMeter readMeter;
 +        // the scheduled persistence of the readMeter, that we will cancel once all instances of this logical
 +        // sstable have been released
 +        private final ScheduledFuture readMeterSyncFuture;
 +        // shared state managing if the logical sstable has been compacted; this is used in cleanup both here
 +        // and in the FINAL type tidier
 +        private final AtomicBoolean isCompacted;
 +
 +        GlobalTidy(final SSTableReader reader)
 +        {
 +            this.desc = reader.descriptor;
 +            this.isCompacted = new AtomicBoolean();
 +            this.live = reader;
 +            // Don't track read rates for tables in the system keyspace and don't bother trying to load or persist
 +            // the read meter when in client mode.
 +            if (SystemKeyspace.NAME.equals(desc.ksname))
 +            {
 +                readMeter = null;
 +                readMeterSyncFuture = null;
 +                return;
 +            }
 +
 +            readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
 +            // sync the average read rate to system.sstable_activity every five minutes, starting one minute from now
 +            readMeterSyncFuture = syncExecutor.scheduleAtFixedRate(new Runnable()
 +            {
 +                public void run()
 +                {
 +                    if (!isCompacted.get())
 +                    {
 +                        meterSyncThrottle.acquire();
 +                        SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.generation, readMeter);
 +                    }
 +                }
 +            }, 1, 5, TimeUnit.MINUTES);
 +        }
 +
 +        public void tidy()
 +        {
 +            lookup.remove(desc);
 +            if (readMeterSyncFuture != null)
 +                readMeterSyncFuture.cancel(true);
 +            if (isCompacted.get())
 +                SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
 +            // don't ideally want to dropPageCache for the file until all instances have been released
 +            dropPageCache(desc.filenameFor(Component.DATA));
 +            dropPageCache(desc.filenameFor(Component.PRIMARY_INDEX));
 +        }
 +
 +        public String name()
 +        {
 +            return desc.toString();
 +        }
 +
 +        // get a new reference to the shared GlobalTidy for this sstable
 +        public static Ref<GlobalTidy> get(SSTableReader sstable)
 +        {
 +            Descriptor descriptor = sstable.descriptor;
 +            Ref<GlobalTidy> refc = lookup.get(descriptor);
 +            if (refc != null)
 +                return refc.ref();
 +            final GlobalTidy tidy = new GlobalTidy(sstable);
 +            refc = new Ref<>(tidy, tidy);
 +            Ref<?> ex = lookup.putIfAbsent(descriptor, refc);
 +            assert ex == null;
 +            return refc;
 +        }
 +    }
 +
 +    private static void dropPageCache(String filePath)
 +    {
 +        RandomAccessFile file = null;
 +
 +        try
 +        {
 +            file = new RandomAccessFile(filePath, "r");
 +
 +            int fd = CLibrary.getfd(file.getFD());
 +
 +            if (fd > 0)
 +            {
 +                if (logger.isDebugEnabled())
 +                    logger.debug(String.format("Dropping page cache of file %s.", filePath));
 +
 +                CLibrary.trySkipCache(fd, 0, 0);
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            // we don't care if cache cleanup fails
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(file);
 +        }
 +    }
 +
 +    public static abstract class Factory
 +    {
 +        public abstract SSTableReader open(final Descriptor descriptor,
 +                                           Set<Component> components,
 +                                           CFMetaData metadata,
 +                                           IPartitioner partitioner,
 +                                           Long maxDataAge,
 +                                           StatsMetadata sstableMetadata,
 +                                           OpenReason openReason);
 +
 +    }
 +}