You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/01/07 03:56:16 UTC

svn commit: r896741 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ test/unit/org/apache/cassandra/io/

Author: jbellis
Date: Thu Jan  7 02:56:16 2010
New Revision: 896741

URL: http://svn.apache.org/viewvc?rev=896741&view=rev
Log:
store data information for any index entries spanning a mmap segment boundary when reading the index (with a BufferedRAF) so we don't have to deal with that at read time.
patch by jbellis; tested by Brandon Williams for CASSANDRA-669

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableAccessor.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java?rev=896741&r1=896740&r2=896741&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java Thu Jan  7 02:56:16 2010
@@ -25,6 +25,7 @@
 
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.ICompactSerializer2;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -98,6 +99,21 @@
         return key != null && key.isEmpty();
     }
 
+    /** not efficient.  call rarely. */
+    public int serializedSize()
+    {
+        DataOutputBuffer dos = new DataOutputBuffer();
+        try
+        {
+            serializer.serialize(this, dos);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return dos.getLength();
+    }
+
     @Override
     public String toString()
     {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=896741&r1=896740&r2=896741&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Thu Jan  7 02:56:16 2010
@@ -25,6 +25,7 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.Arrays;
+import java.util.Map;
 
 import org.apache.log4j.Logger;
 import org.apache.commons.lang.StringUtils;
@@ -56,6 +57,7 @@
     protected IPartitioner partitioner;
     protected BloomFilter bf;
     protected List<KeyPosition> indexPositions;
+    protected Map<KeyPosition, PositionSize> spannedIndexDataPositions; // map of index position, to data position, for index entries spanning mmap segments
     protected String columnFamilyName;
 
     /* Every 128th index entry is loaded into memory so we know where to start looking for the actual key w/o seeking */

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=896741&r1=896740&r2=896741&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Thu Jan  7 02:56:16 2010
@@ -90,7 +90,7 @@
         };
         new Thread(runnable, "SSTABLE-DELETER").start();
     }};
-    private final int BUFFER_SIZE = Integer.MAX_VALUE;
+    private static final int BUFFER_SIZE = Integer.MAX_VALUE;
 
     public static int indexInterval()
     {
@@ -196,7 +196,11 @@
 
     private ConcurrentLinkedHashMap<DecoratedKey, PositionSize> keyCache;
 
-    SSTableReader(String filename, IPartitioner partitioner, List<KeyPosition> indexPositions, BloomFilter bloomFilter, ConcurrentLinkedHashMap<DecoratedKey, PositionSize> keyCache)
+    SSTableReader(String filename,
+                  IPartitioner partitioner,
+                  List<KeyPosition> indexPositions, Map<KeyPosition, PositionSize> spannedIndexDataPositions,
+                  BloomFilter bloomFilter,
+                  ConcurrentLinkedHashMap<DecoratedKey, PositionSize> keyCache)
             throws IOException
     {
         super(filename, partitioner);
@@ -219,6 +223,7 @@
         }
 
         this.indexPositions = indexPositions;
+        this.spannedIndexDataPositions = spannedIndexDataPositions;
         this.bf = bloomFilter;
         phantomReference = new FileDeletingReference(this, finalizerQueue);
         finalizers.add(phantomReference);
@@ -261,7 +266,7 @@
 
     private SSTableReader(String filename, IPartitioner partitioner) throws IOException
     {
-        this(filename, partitioner, null, null, null);
+        this(filename, partitioner, null, null, null, null);
     }
 
     public List<KeyPosition> getIndexPositions()
@@ -285,28 +290,54 @@
     void loadIndexFile() throws IOException
     {
         indexPositions = new ArrayList<KeyPosition>();
-
-        FileDataInput input = new MappedFileDataInput(indexBuffer, indexFilename());
-        int i = 0;
-        long indexSize = input.length();
-        while (true)
+        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
+        // any entries that do, we force into the in-memory sample so key lookup can always bsearch within
+        // a single mmapped segment.
+        BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(), "r");
+        try
         {
-            long indexPosition = input.getFilePointer();
-            if (indexPosition == indexSize)
-            {
-                break;
-            }
-            DecoratedKey decoratedKey = partitioner.convertFromDiskFormat(input.readUTF());
-            input.readLong();
-            if (i++ % INDEX_INTERVAL == 0)
+            int i = 0;
+            long indexSize = input.length();
+            while (true)
             {
-                indexPositions.add(new KeyPosition(decoratedKey, indexPosition));
+                long indexPosition = input.getFilePointer();
+                if (indexPosition == indexSize)
+                {
+                    break;
+                }
+                DecoratedKey decoratedKey = partitioner.convertFromDiskFormat(input.readUTF());
+                long dataPosition = input.readLong();
+                long nextIndexPosition = input.getFilePointer();
+                boolean spannedEntry = bufferIndex(indexPosition) != bufferIndex(nextIndexPosition);
+                if (i++ % INDEX_INTERVAL == 0 || spannedEntry)
+                {
+                    KeyPosition info;
+                    info = new KeyPosition(decoratedKey, indexPosition);
+                    indexPositions.add(info);
+
+                    if (spannedEntry)
+                    {
+                        if (spannedIndexDataPositions == null)
+                        {
+                            spannedIndexDataPositions = new HashMap<KeyPosition, PositionSize>();
+                        }
+                        // read the next index entry to see how big the row is corresponding to the current, mmap-segment-spanning one
+                        input.readUTF();
+                        long nextDataPosition = input.readLong();
+                        input.seek(nextIndexPosition);
+                        spannedIndexDataPositions.put(info, new PositionSize(dataPosition, nextDataPosition - dataPosition));
+                    }
+                }
             }
         }
+        finally
+        {
+            input.close();
+        }
     }
 
     /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */
-    private long getIndexScanPosition(DecoratedKey decoratedKey)
+    private KeyPosition getIndexScanPosition(DecoratedKey decoratedKey)
     {
         assert indexPositions != null && indexPositions.size() > 0;
         int index = Collections.binarySearch(indexPositions, new KeyPosition(decoratedKey, -1));
@@ -316,12 +347,12 @@
             // i.e., its insertion position
             int greaterThan = (index + 1) * -1;
             if (greaterThan == 0)
-                return -1;
-            return indexPositions.get(greaterThan - 1).position;
+                return null;
+            return indexPositions.get(greaterThan - 1);
         }
         else
         {
-            return indexPositions.get(index).position;
+            return indexPositions.get(index);
         }
     }
 
@@ -340,14 +371,20 @@
                 return cachedPosition;
             }
         }
-        long start = getIndexScanPosition(decoratedKey);
-        if (start < 0)
+        KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
+        if (sampledPosition == null)
         {
             return null;
         }
+        if (spannedIndexDataPositions != null)
+        {
+            PositionSize info = spannedIndexDataPositions.get(sampledPosition);
+            if (info != null)
+                return info;
+        }
 
         FileDataInput input = new MappedFileDataInput(indexBuffer, indexFilename());
-        input.seek(start);
+        input.seek(sampledPosition.position);
         int i = 0;
         do
         {
@@ -388,11 +425,29 @@
     /** like getPosition, but if key is not found will return the location of the first key _greater_ than the desired one, or -1 if no such key exists. */
     public long getNearestPosition(DecoratedKey decoratedKey) throws IOException
     {
-        long start = getIndexScanPosition(decoratedKey);
-        if (start < 0)
+        KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
+        if (sampledPosition == null)
         {
             return 0;
         }
+
+        // by default, we plan to start scanning at the nearest bsearched index entry
+        long start = sampledPosition.position;
+        if (spannedIndexDataPositions != null)
+        {
+            // check if the index entry spans a mmap segment boundary
+            PositionSize info = spannedIndexDataPositions.get(sampledPosition);
+            if (info != null)
+            {
+                // if the key matches the index entry we don't have to scan the index after all
+                if (sampledPosition.key.compareTo(decoratedKey) == 0)
+                    return info.position;
+                // otherwise, start scanning at the next entry (which won't span a boundary;
+                // if it did it would have been in the index sample and we would have started with that instead)
+                start = info.position + sampledPosition.key.serializedSize() + (Long.SIZE / 8);
+            }
+        }
+
         BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(path), "r");
         input.seek(start);
         try
@@ -483,7 +538,7 @@
         return new MappedFileDataInput(buffers[bufferIndex(info.position)], path, (int) (info.position % BUFFER_SIZE));
     }
 
-    private int bufferIndex(long position)
+    static int bufferIndex(long position)
     {
         return (int) (position / BUFFER_SIZE);
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=896741&r1=896740&r2=896741&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java Thu Jan  7 02:56:16 2010
@@ -23,6 +23,7 @@
 
 import java.io.*;
 import java.util.ArrayList;
+import java.util.HashMap;
 
 import org.apache.log4j.Logger;
 
@@ -70,26 +71,38 @@
         return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
     }
 
-    private void afterAppend(DecoratedKey decoratedKey, long position) throws IOException
+    private void afterAppend(DecoratedKey decoratedKey, long dataPosition, int dataSize) throws IOException
     {
         String diskKey = partitioner.convertToDiskFormat(decoratedKey);
         bf.add(diskKey);
         lastWrittenKey = decoratedKey;
         long indexPosition = indexFile.getFilePointer();
         indexFile.writeUTF(diskKey);
-        indexFile.writeLong(position);
+        indexFile.writeLong(dataPosition);
         if (logger.isTraceEnabled())
-            logger.trace("wrote " + decoratedKey + " at " + position);
+            logger.trace("wrote " + decoratedKey + " at " + dataPosition);
+        if (logger.isTraceEnabled())
+            logger.trace("wrote index of " + decoratedKey + " at " + indexPosition);
 
-        if (keysWritten++ % INDEX_INTERVAL != 0)
-            return;
-        if (indexPositions == null)
+        boolean spannedEntry = SSTableReader.bufferIndex(indexPosition) != SSTableReader.bufferIndex(indexFile.getFilePointer());
+        if (keysWritten++ % INDEX_INTERVAL == 0 || spannedEntry)
         {
-            indexPositions = new ArrayList<KeyPosition>();
+            if (indexPositions == null)
+            {
+                indexPositions = new ArrayList<KeyPosition>();
+            }
+            KeyPosition info = new KeyPosition(decoratedKey, indexPosition);
+            indexPositions.add(info);
+
+            if (spannedEntry)
+            {
+                if (spannedIndexDataPositions == null)
+                {
+                    spannedIndexDataPositions = new HashMap<KeyPosition, PositionSize>();
+                }
+                spannedIndexDataPositions.put(info, new PositionSize(dataPosition, dataSize));
+            }
         }
-        indexPositions.add(new KeyPosition(decoratedKey, indexPosition));
-        if (logger.isTraceEnabled())
-            logger.trace("wrote index of " + decoratedKey + " at " + indexPosition);
     }
 
     // TODO make this take a DataOutputStream and wrap the byte[] version to combine them
@@ -101,7 +114,7 @@
         assert length > 0;
         dataFile.writeInt(length);
         dataFile.write(buffer.getData(), 0, length);
-        afterAppend(decoratedKey, currentPosition);
+        afterAppend(decoratedKey, currentPosition, length);
     }
 
     public void append(DecoratedKey decoratedKey, byte[] value) throws IOException
@@ -111,7 +124,7 @@
         assert value.length > 0;
         dataFile.writeInt(value.length);
         dataFile.write(value);
-        afterAppend(decoratedKey, currentPosition);
+        afterAppend(decoratedKey, currentPosition, value.length);
     }
 
     /**
@@ -141,7 +154,7 @@
         ConcurrentLinkedHashMap<DecoratedKey, SSTableReader.PositionSize> keyCache = cacheFraction > 0
                                                         ? SSTableReader.createKeyCache((int) (cacheFraction * keysWritten))
                                                         : null;
-        return new SSTableReader(path, partitioner, indexPositions, bf, keyCache);
+        return new SSTableReader(path, partitioner, indexPositions, spannedIndexDataPositions, bf, keyCache);
     }
 
     static String rename(String tmpFilename)

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableAccessor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableAccessor.java?rev=896741&r1=896740&r2=896741&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableAccessor.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/io/SSTableAccessor.java Thu Jan  7 02:56:16 2010
@@ -27,7 +27,7 @@
     public static SSTableReader getSSTableReader(String filename, IPartitioner<?> partitioner)
     throws IOException
     {
-        SSTableReader sstable =  new SSTableReader(filename, partitioner, null, null, null);
+        SSTableReader sstable =  new SSTableReader(filename, partitioner, null, null, null, null);
         sstable.loadBloomFilter();
         sstable.loadIndexFile();
         return sstable;