You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/02/12 14:39:53 UTC

[2/3] cassandra git commit: Make SSTableWriter.openEarly more robust and obvious

Make SSTableWriter.openEarly more robust and obvious

patch by benedict; reviewed by marcus for CASSANDRA-8747


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4eb9fa78
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4eb9fa78
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4eb9fa78

Branch: refs/heads/trunk
Commit: 4eb9fa78bd233f5f9b901dd677636842b351330b
Parents: f57ec8c
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Feb 12 13:35:23 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Feb 12 13:35:23 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../io/compress/CompressedSequentialWriter.java |   2 +
 .../io/sstable/IndexSummaryBuilder.java         | 162 ++++++++++++++-----
 .../cassandra/io/sstable/SSTableWriter.java     |  76 +++++----
 .../cassandra/io/util/SequentialWriter.java     |  10 ++
 5 files changed, 168 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4eb9fa78/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2466b19..cbb4334 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.4
+ * Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747)
  * Enforce SSTableReader.first/last (CASSANDRA-8744)
  * Cleanup SegmentedFile API (CASSANDRA-8749)
  * Avoid overlap with early compaction replacement (CASSANDRA-8683)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4eb9fa78/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 81bb3e9..87eb2fb 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -138,6 +138,8 @@ public class CompressedSequentialWriter extends SequentialWriter
 
         // next chunk should be written right after current + length of the checksum (int)
         chunkOffset += compressedLength + 4;
+        if (runPostFlush != null)
+            runPostFlush.run();
     }
 
     public CompressionMetadata open(SSTableWriter.FinishType finishType)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4eb9fa78/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index df326d7..3b93b31 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.io.sstable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +47,41 @@ public class IndexSummaryBuilder
     private long keysWritten = 0;
     private long indexIntervalMatches = 0;
     private long offheapSize = 0;
+    private long nextSamplePosition;
+
+    // for each ReadableBoundary, we map its dataLength property to itself, permitting us to lookup the
+    // last readable boundary from the perspective of the data file
+    // [data file position limit] => [ReadableBoundary]
+    private TreeMap<Long, ReadableBoundary> lastReadableByData = new TreeMap<>();
+    // for each ReadableBoundary, we map its indexLength property to itself, permitting us to lookup the
+    // last readable boundary from the perspective of the index file
+    // [index file position limit] => [ReadableBoundary]
+    private TreeMap<Long, ReadableBoundary> lastReadableByIndex = new TreeMap<>();
+    // the last synced data file position
+    private long dataSyncPosition;
+    // the last synced index file position
+    private long indexSyncPosition;
+
+    // the last summary interval boundary that is fully readable in both data and index files
+    private ReadableBoundary lastReadableBoundary;
+
+    /**
+     * Represents a boundary that is guaranteed fully readable in the summary, index file and data file.
+     * The key contained is the last key readable if the index and data files have been flushed to the
+     * stored lengths.
+     */
+    public static class ReadableBoundary
+    {
+        final DecoratedKey lastKey;
+        final long indexLength;
+        final long dataLength;
+        public ReadableBoundary(DecoratedKey lastKey, long indexLength, long dataLength)
+        {
+            this.lastKey = lastKey;
+            this.indexLength = indexLength;
+            this.dataLength = dataLength;
+        }
+    }
 
     public IndexSummaryBuilder(long expectedKeys, int minIndexInterval, int samplingLevel)
     {
@@ -71,74 +108,113 @@ public class IndexSummaryBuilder
         maxExpectedEntries = (maxExpectedEntries * samplingLevel) / BASE_SAMPLING_LEVEL;
         positions = new ArrayList<>((int)maxExpectedEntries);
         keys = new ArrayList<>((int)maxExpectedEntries);
+        // if we're downsampling we may not use index 0
+        setNextSamplePosition(-minIndexInterval);
     }
 
-    // finds the last (-offset) decorated key that can be guaranteed to occur fully in the index file before the provided file position
-    public DecoratedKey getMaxReadableKey(long position, int offset)
+    // the index file has been flushed to the provided position; stash it and use that to recalculate our max readable boundary
+    public void markIndexSynced(long upToPosition)
     {
-        int i = Collections.binarySearch(positions, position);
-        if (i < 0)
-        {
-            i = -1 - i;
-            if (i == positions.size())
-                i -= 2;
-            else
-                i -= 1;
-        }
-        else
-            i -= 1;
-        i -= offset;
-        // we don't want to return any key if there's only 1 item in the summary, to make sure the sstable range is non-empty
-        if (i <= 0)
-            return null;
-        return keys.get(i);
+        indexSyncPosition = upToPosition;
+        refreshReadableBoundary();
     }
 
-    public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexPosition)
+    // the data file has been flushed to the provided position; stash it and use that to recalculate our max readable boundary
+    public void markDataSynced(long upToPosition)
     {
-        if (keysWritten % minIndexInterval == 0)
-        {
-            // see if we should skip this key based on our sampling level
-            boolean shouldSkip = false;
-            for (int start : startPoints)
-            {
-                if ((indexIntervalMatches - start) % BASE_SAMPLING_LEVEL == 0)
-                {
-                    shouldSkip = true;
-                    break;
-                }
-            }
+        dataSyncPosition = upToPosition;
+        refreshReadableBoundary();
+    }
 
-            if (!shouldSkip)
-            {
-                keys.add(getMinimalKey(decoratedKey));
-                offheapSize += decoratedKey.getKey().remaining();
-                positions.add(indexPosition);
-                offheapSize += TypeSizes.NATIVE.sizeof(indexPosition);
-            }
+    private void refreshReadableBoundary()
+    {
+        // grab the readable boundary prior to the given position in either the data or index file
+        Map.Entry<?, ReadableBoundary> byData = lastReadableByData.floorEntry(dataSyncPosition);
+        Map.Entry<?, ReadableBoundary> byIndex = lastReadableByIndex.floorEntry(indexSyncPosition);
+        if (byData == null || byIndex == null)
+            return;
+
+        // take the lowest of the two, and stash it
+        lastReadableBoundary = byIndex.getValue().indexLength < byData.getValue().indexLength
+                               ? byIndex.getValue() : byData.getValue();
+
+        // clear our data prior to this, since we no longer need it
+        lastReadableByData.headMap(lastReadableBoundary.dataLength, false).clear();
+        lastReadableByIndex.headMap(lastReadableBoundary.indexLength, false).clear();
+    }
 
-            indexIntervalMatches++;
+    public ReadableBoundary getLastReadableBoundary()
+    {
+        return lastReadableBoundary;
+    }
+
+    public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart)
+    {
+        return maybeAddEntry(decoratedKey, indexStart, 0, 0);
+    }
+
+    /**
+     *
+     * @param decoratedKey the key for this record
+     * @param indexStart the position in the index file this record begins
+     * @param indexEnd the position in the index file we need to be able to read to (exclusive) to read this record
+     * @param dataEnd the position in the data file we need to be able to read to (exclusive) to read this record
+     *                a value of 0 indicates we are not tracking readable boundaries
+     */
+    public IndexSummaryBuilder maybeAddEntry(DecoratedKey decoratedKey, long indexStart, long indexEnd, long dataEnd)
+    {
+        if (keysWritten == nextSamplePosition)
+        {
+            keys.add(getMinimalKey(decoratedKey));
+            offheapSize += decoratedKey.getKey().remaining();
+            positions.add(indexStart);
+            offheapSize += TypeSizes.NATIVE.sizeof(indexStart);
+            setNextSamplePosition(keysWritten);
+        }
+        else if (dataEnd != 0 && keysWritten + 1 == nextSamplePosition)
+        {
+            // this is the last key in this summary interval, so stash it
+            ReadableBoundary boundary = new ReadableBoundary(decoratedKey, indexEnd, dataEnd);
+            lastReadableByData.put(dataEnd, boundary);
+            lastReadableByIndex.put(indexEnd, boundary);
         }
         keysWritten++;
 
         return this;
     }
 
+    // calculate the next key we will store to our summary
+    private void setNextSamplePosition(long position)
+    {
+        tryAgain: while (true)
+        {
+            position += minIndexInterval;
+            long test = indexIntervalMatches++;
+            for (int start : startPoints)
+                if ((test - start) % BASE_SAMPLING_LEVEL == 0)
+                    continue tryAgain;
+
+            nextSamplePosition = position;
+            return;
+        }
+    }
+
     public IndexSummary build(IPartitioner partitioner)
     {
         return build(partitioner, null);
     }
 
-    public IndexSummary build(IPartitioner partitioner, DecoratedKey exclusiveUpperBound)
+    // lastIntervalKey should come from getLastReadableBoundary().lastKey
+    public IndexSummary build(IPartitioner partitioner, DecoratedKey lastIntervalKey)
     {
         assert keys.size() > 0;
         assert keys.size() == positions.size();
 
         int length;
-        if (exclusiveUpperBound == null)
+        if (lastIntervalKey == null)
             length = keys.size();
-        else
-            length = Collections.binarySearch(keys, exclusiveUpperBound);
+        else // since it's an inclusive upper bound, this should never match exactly
+            length = -1 -Collections.binarySearch(keys, lastIntervalKey);
 
         assert length > 0;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4eb9fa78/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index d430314..2c1cf0e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -131,7 +131,6 @@ public class SSTableWriter extends SSTable
               metadata,
               partitioner);
         this.repairedAt = repairedAt;
-        iwriter = new IndexWriter(keyCount);
 
         if (compression)
         {
@@ -146,6 +145,7 @@ public class SSTableWriter extends SSTable
             dataFile = SequentialWriter.open(new File(getFilename()), new File(descriptor.filenameFor(Component.CRC)));
             dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
         }
+        iwriter = new IndexWriter(keyCount, dataFile);
 
         this.sstableMetadataCollector = sstableMetadataCollector;
     }
@@ -183,7 +183,7 @@ public class SSTableWriter extends SSTable
 
         if (logger.isTraceEnabled())
             logger.trace("wrote " + decoratedKey + " at " + dataPosition);
-        iwriter.append(decoratedKey, index);
+        iwriter.append(decoratedKey, index, dataPosition);
         dbuilder.addPotentialBoundary(dataPosition);
     }
 
@@ -193,11 +193,11 @@ public class SSTableWriter extends SSTable
      */
     public RowIndexEntry append(AbstractCompactedRow row)
     {
-        long currentPosition = beforeAppend(row.key);
+        long startPosition = beforeAppend(row.key);
         RowIndexEntry entry;
         try
         {
-            entry = row.write(currentPosition, dataFile.stream);
+            entry = row.write(startPosition, dataFile.stream);
             if (entry == null)
                 return null;
         }
@@ -205,8 +205,9 @@ public class SSTableWriter extends SSTable
         {
             throw new FSWriteError(e, dataFile.getPath());
         }
-        sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats());
-        afterAppend(row.key, currentPosition, entry);
+        long endPosition = dataFile.getFilePointer();
+        sstableMetadataCollector.update(endPosition - startPosition, row.columnStats());
+        afterAppend(row.key, endPosition, entry);
         return entry;
     }
 
@@ -390,10 +391,11 @@ public class SSTableWriter extends SSTable
                                                   repairedAt).get(MetadataType.STATS);
 
         // find the max (exclusive) readable key
-        DecoratedKey exclusiveUpperBoundOfReadableIndex = iwriter.getMaxReadableKey(0);
-        if (exclusiveUpperBoundOfReadableIndex == null)
+        IndexSummaryBuilder.ReadableBoundary boundary = iwriter.getMaxReadable();
+        if (boundary == null)
             return null;
 
+        assert boundary.indexLength > 0 && boundary.dataLength > 0;
         Descriptor link = makeTmpLinks();
         // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
         SegmentedFile ifile = iwriter.builder.complete(link.filenameFor(Component.PRIMARY_INDEX), FinishType.EARLY);
@@ -401,33 +403,12 @@ public class SSTableWriter extends SSTable
         SSTableReader sstable = SSTableReader.internalOpen(descriptor.asType(Descriptor.Type.FINAL),
                                                            components, metadata,
                                                            partitioner, ifile,
-                                                           dfile, iwriter.summary.build(partitioner, exclusiveUpperBoundOfReadableIndex),
+                                                           dfile, iwriter.summary.build(partitioner, boundary.lastKey),
                                                            iwriter.bf.sharedCopy(), maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
 
         // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
         sstable.first = getMinimalKey(first);
-        sstable.last = getMinimalKey(exclusiveUpperBoundOfReadableIndex);
-        DecoratedKey inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(1);
-        if (inclusiveUpperBoundOfReadableData == null)
-        {
-            // Prevent leaving tmplink files on disk
-            sstable.selfRef().release();
-            return null;
-        }
-        int offset = 2;
-        while (true)
-        {
-            RowIndexEntry indexEntry = sstable.getPosition(inclusiveUpperBoundOfReadableData, SSTableReader.Operator.GT);
-            if (indexEntry != null && indexEntry.position <= dataFile.getLastFlushOffset())
-                break;
-            inclusiveUpperBoundOfReadableData = iwriter.getMaxReadableKey(offset++);
-            if (inclusiveUpperBoundOfReadableData == null)
-            {
-                sstable.selfRef().release();
-                return null;
-            }
-        }
-        sstable.last = getMinimalKey(inclusiveUpperBoundOfReadableData);
+        sstable.last = getMinimalKey(boundary.lastKey);
         return sstable;
     }
 
@@ -593,25 +574,39 @@ public class SSTableWriter extends SSTable
         public final IFilter bf;
         private FileMark mark;
 
-        IndexWriter(long keyCount)
+        IndexWriter(long keyCount, final SequentialWriter dataFile)
         {
             indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
             builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
             summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL);
             bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true);
+            // register listeners to be alerted when the data files are flushed
+            indexFile.setPostFlushListener(new Runnable()
+            {
+                public void run()
+                {
+                    summary.markIndexSynced(indexFile.getLastFlushOffset());
+                }
+            });
+            dataFile.setPostFlushListener(new Runnable()
+            {
+                public void run()
+                {
+                    summary.markDataSynced(dataFile.getLastFlushOffset());
+                }
+            });
         }
 
         // finds the last (-offset) decorated key that can be guaranteed to occur fully in the flushed portion of the index file
-        DecoratedKey getMaxReadableKey(int offset)
+        IndexSummaryBuilder.ReadableBoundary getMaxReadable()
         {
-            long maxIndexLength = indexFile.getLastFlushOffset();
-            return summary.getMaxReadableKey(maxIndexLength, offset);
+            return summary.getLastReadableBoundary();
         }
 
-        public void append(DecoratedKey key, RowIndexEntry indexEntry)
+        public void append(DecoratedKey key, RowIndexEntry indexEntry, long dataEnd)
         {
             bf.add(key.getKey());
-            long indexPosition = indexFile.getFilePointer();
+            long indexStart = indexFile.getFilePointer();
             try
             {
                 ByteBufferUtil.writeWithShortLength(key.getKey(), indexFile.stream);
@@ -621,12 +616,13 @@ public class SSTableWriter extends SSTable
             {
                 throw new FSWriteError(e, indexFile.getPath());
             }
+            long indexEnd = indexFile.getFilePointer();
 
             if (logger.isTraceEnabled())
-                logger.trace("wrote index entry: " + indexEntry + " at " + indexPosition);
+                logger.trace("wrote index entry: " + indexEntry + " at " + indexStart);
 
-            summary.maybeAddEntry(key, indexPosition);
-            builder.addPotentialBoundary(indexPosition);
+            summary.maybeAddEntry(key, indexStart, indexEnd, dataEnd);
+            builder.addPotentialBoundary(indexStart);
         }
 
         public void abort()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4eb9fa78/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 227c79d..40f3e9d 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -69,6 +69,8 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
     public final DataOutputPlus stream;
     protected long lastFlushOffset;
 
+    protected Runnable runPostFlush;
+
     public SequentialWriter(File file, int bufferSize)
     {
         try
@@ -304,6 +306,12 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         }
     }
 
+    public void setPostFlushListener(Runnable runPostFlush)
+    {
+        assert this.runPostFlush == null;
+        this.runPostFlush = runPostFlush;
+    }
+
     /**
      * Override this method instead of overriding flush()
      * @throws FSWriteError on any I/O error.
@@ -319,6 +327,8 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         {
             throw new FSWriteError(e, getPath());
         }
+        if (runPostFlush != null)
+            runPostFlush.run();
     }
 
     public long getFilePointer()