You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/06/15 15:29:08 UTC

svn commit: r954875 - in /cassandra/trunk/src/java/org/apache/cassandra/io/sstable: RowIndexedReader.java RowIndexedScanner.java SSTableReader.java SSTableScanner.java SSTableWriter.java

Author: jbellis
Date: Tue Jun 15 13:29:07 2010
New Revision: 954875

URL: http://svn.apache.org/viewvc?rev=954875&view=rev
Log:
merge {SSTable,RowIndexed}Reader, {SSTable,RowIndexed}Scanner
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1127

Removed:
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedScanner.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=954875&r1=954874&r2=954875&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Tue Jun 15 13:29:07 2010
@@ -26,6 +26,13 @@ import java.lang.ref.Reference;
 import java.nio.channels.FileChannel;
 import java.nio.MappedByteBuffer;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,10 +51,13 @@ import org.apache.cassandra.io.util.File
  * SSTableReaders are open()ed by Table.onStart; after that they are created by SSTableWriter.renameAndOpen.
  * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
  */
-public abstract class SSTableReader extends SSTable implements Comparable<SSTableReader>
+public class SSTableReader extends SSTable implements Comparable<SSTableReader>
 {
     private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
 
+    // guesstimated size of INDEX_INTERVAL index entries
+    private static final int INDEX_FILE_BUFFER_BYTES = 16 * IndexSummary.INDEX_INTERVAL;
+
     // `finalizers` is required to keep the PhantomReferences alive after the enclosing SSTR is itself
     // unreferenced.  otherwise they will never get enqueued.
     private static final Set<Reference<SSTableReader>> finalizers = new HashSet<Reference<SSTableReader>>();
@@ -97,6 +107,14 @@ public abstract class SSTableReader exte
      */
     public final long maxDataAge;
 
+    // indexfile and datafile: might be null before a call to load()
+    private SegmentedFile ifile;
+    private SegmentedFile dfile;
+
+    private InstrumentedCache<Pair<Descriptor,DecoratedKey>, Long> keyCache;
+
+    private volatile SSTableDeletingReference phantomReference;
+
     public static int indexInterval()
     {
         return IndexSummary.INDEX_INTERVAL;
@@ -144,7 +162,7 @@ public abstract class SSTableReader exte
         // FIXME: version conditional readers here
         if (true)
         {
-            sstable = RowIndexedReader.internalOpen(descriptor, partitioner);
+            sstable = internalOpen(descriptor, partitioner);
         }
 
         if (logger.isDebugEnabled())
@@ -153,39 +171,173 @@ public abstract class SSTableReader exte
         return sstable;
     }
 
+    /** Open a RowIndexedReader which needs its state loaded from disk. */
+    static SSTableReader internalOpen(Descriptor desc, IPartitioner partitioner) throws IOException
+    {
+        SSTableReader sstable = new SSTableReader(desc, partitioner, null, null, null, null, System.currentTimeMillis());
+
+        // versions before 'c' encoded keys as utf-16 before hashing to the filter
+        if (desc.versionCompareTo("c") < 0)
+            sstable.load(true);
+        else
+        {
+            sstable.load(false);
+            sstable.loadBloomFilter();
+        }
+
+        return sstable;
+    }
+
+    /**
+     * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
+     */
+    static SSTableReader internalOpen(Descriptor desc, IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary isummary, BloomFilter bf, long maxDataAge) throws IOException
+    {
+        assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null;
+        return new SSTableReader(desc, partitioner, ifile, dfile, isummary, bf, maxDataAge);
+    }
+
+    SSTableReader(Descriptor desc,
+                     IPartitioner partitioner,
+                     SegmentedFile ifile,
+                     SegmentedFile dfile,
+                     IndexSummary indexSummary,
+                     BloomFilter bloomFilter,
+                     long maxDataAge)
+    throws IOException
+    {
+        super(desc, partitioner);
+        this.maxDataAge = maxDataAge;
+
+
+        this.ifile = ifile;
+        this.dfile = dfile;
+        this.indexSummary = indexSummary;
+        this.bf = bloomFilter;
+    }
+
     public void setTrackedBy(SSTableTracker tracker)
     {
         phantomReference = new SSTableDeletingReference(tracker, this, finalizerQueue);
         finalizers.add(phantomReference);
+        keyCache = tracker.getKeyCache();
     }
 
-    protected SSTableReader(Descriptor desc, IPartitioner partitioner, long maxDataAge)
+    void loadBloomFilter() throws IOException
     {
-        super(desc, partitioner);
-        this.maxDataAge = maxDataAge;
+        DataInputStream stream = new DataInputStream(new FileInputStream(filterFilename()));
+        try
+        {
+            bf = BloomFilter.serializer().deserialize(stream);
+        }
+        finally
+        {
+            stream.close();
+        }
     }
 
-    private volatile SSTableDeletingReference phantomReference;
+    /**
+     * Loads ifile, dfile and indexSummary, and optionally recreates the bloom filter.
+     */
+    private void load(boolean recreatebloom) throws IOException
+    {
+        SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder();
+        SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder();
+
+        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
+        indexSummary = new IndexSummary();
+        BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(), "r");
+        try
+        {
+            long indexSize = input.length();
+            if (recreatebloom)
+                // estimate key count based on index length
+                bf = BloomFilter.getFilter((int)(input.length() / 32), 15);
+            while (true)
+            {
+                long indexPosition = input.getFilePointer();
+                if (indexPosition == indexSize)
+                    break;
+
+                DecoratedKey decoratedKey = partitioner.convertFromDiskFormat(FBUtilities.readShortByteArray(input));
+                if (recreatebloom)
+                    bf.add(decoratedKey.key);
+                long dataPosition = input.readLong();
+
+                indexSummary.maybeAddEntry(decoratedKey, indexPosition);
+                ibuilder.addPotentialBoundary(indexPosition);
+                dbuilder.addPotentialBoundary(dataPosition);
+            }
+            indexSummary.complete();
+        }
+        finally
+        {
+            input.close();
+        }
+
+        // finalize the state of the reader
+        indexSummary.complete();
+        ifile = ibuilder.complete(indexFilename());
+        dfile = dbuilder.complete(getFilename());
+    }
+
+    /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */
+    private IndexSummary.KeyPosition getIndexScanPosition(DecoratedKey decoratedKey)
+    {
+        assert indexSummary.getIndexPositions() != null && indexSummary.getIndexPositions().size() > 0;
+        int index = Collections.binarySearch(indexSummary.getIndexPositions(), new IndexSummary.KeyPosition(decoratedKey, -1));
+        if (index < 0)
+        {
+            // binary search gives us the first index _greater_ than the key searched for,
+            // i.e., its insertion position
+            int greaterThan = (index + 1) * -1;
+            if (greaterThan == 0)
+                return null;
+            return indexSummary.getIndexPositions().get(greaterThan - 1);
+        }
+        else
+        {
+            return indexSummary.getIndexPositions().get(index);
+        }
+    }
 
     /**
      * For testing purposes only.
      */
-    public abstract void forceFilterFailures();
+    public void forceFilterFailures()
+    {
+        bf = BloomFilter.alwaysMatchingBloomFilter();
+    }
 
     /**
      * @return The key cache: for monitoring purposes.
      */
-    public abstract InstrumentedCache getKeyCache();
+    public InstrumentedCache getKeyCache()
+    {
+        return keyCache;
+    }
 
     /**
      * @return An estimate of the number of keys in this SSTable.
      */
-    public abstract long estimatedKeys();
+    public long estimatedKeys()
+    {
+        return indexSummary.getIndexPositions().size() * IndexSummary.INDEX_INTERVAL;
+    }
 
     /**
      * @return Approximately 1/INDEX_INTERVALth of the keys in this SSTable.
      */
-    public abstract Collection<DecoratedKey> getKeySamples();
+    public Collection<DecoratedKey> getKeySamples()
+    {
+        return Collections2.transform(indexSummary.getIndexPositions(),
+                                      new Function<IndexSummary.KeyPosition, DecoratedKey>(){
+                                          public DecoratedKey apply(IndexSummary.KeyPosition kp)
+                                          {
+                                              return kp.key;
+                                          }
+                                      });
+    }
 
     /**
      * Returns the position in the data file to find the given key, or -1 if the
@@ -193,7 +345,71 @@ public abstract class SSTableReader exte
      * FIXME: should not be public: use Scanner.
      */
     @Deprecated
-    public abstract long getPosition(DecoratedKey decoratedKey) throws IOException;
+    public long getPosition(DecoratedKey decoratedKey)
+    {
+        // first, check bloom filter
+        if (!bf.isPresent(partitioner.convertToDiskFormat(decoratedKey)))
+            return -1;
+
+        // next, the key cache
+        Pair<Descriptor, DecoratedKey> unifiedKey = new Pair<Descriptor, DecoratedKey>(desc, decoratedKey);
+        if (keyCache != null && keyCache.getCapacity() > 0)
+        {
+            Long cachedPosition = keyCache.get(unifiedKey);
+            if (cachedPosition != null)
+            {
+                return cachedPosition;
+            }
+        }
+
+        // next, see if the sampled index says it's impossible for the key to be present
+        IndexSummary.KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
+        if (sampledPosition == null)
+            return -1;
+
+        // scan the on-disk index, starting at the nearest sampled position
+        int i = 0;
+        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition.indexPosition, INDEX_FILE_BUFFER_BYTES);
+        while (segments.hasNext())
+        {
+            FileDataInput input = segments.next();
+            try
+            {
+                while (!input.isEOF() && i++ < IndexSummary.INDEX_INTERVAL)
+                {
+                    // read key & data position from index entry
+                    DecoratedKey indexDecoratedKey = partitioner.convertFromDiskFormat(FBUtilities.readShortByteArray(input));
+                    long dataPosition = input.readLong();
+
+                    int v = indexDecoratedKey.compareTo(decoratedKey);
+                    if (v == 0)
+                    {
+                        if (keyCache != null && keyCache.getCapacity() > 0)
+                            keyCache.put(unifiedKey, Long.valueOf(dataPosition));
+                        return dataPosition;
+                    }
+                    if (v > 0)
+                        return -1;
+                }
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+            finally
+            {
+                try
+                {
+                    input.close();
+                }
+                catch (IOException e)
+                {
+                    logger.error("error closing file", e);
+                }
+            }
+        }
+        return -1;
+    }
 
     /**
      * Like getPosition, but if key is not found will return the location of the
@@ -201,12 +417,54 @@ public abstract class SSTableReader exte
      * FIXME: should not be public: use Scanner.
      */
     @Deprecated
-    public abstract long getNearestPosition(DecoratedKey decoratedKey) throws IOException;
+    public long getNearestPosition(DecoratedKey decoratedKey)
+    {
+        IndexSummary.KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
+        if (sampledPosition == null)
+            return 0;
+
+        // scan the on-disk index, starting at the nearest sampled position
+        Iterator<FileDataInput> segiter = ifile.iterator(sampledPosition.indexPosition, INDEX_FILE_BUFFER_BYTES);
+        while (segiter.hasNext())
+        {
+            FileDataInput input = segiter.next();
+            try
+            {
+                while (!input.isEOF())
+                {
+                    DecoratedKey indexDecoratedKey = partitioner.convertFromDiskFormat(FBUtilities.readShortByteArray(input));
+                    long position = input.readLong();
+                    int v = indexDecoratedKey.compareTo(decoratedKey);
+                    if (v >= 0)
+                        return position;
+                }
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+            finally
+            {
+                try
+                {
+                    input.close();
+                }
+                catch (IOException e)
+                {
+                    logger.error("error closing file", e);
+                }
+            }
+        }
+        return -1;
+    }
 
     /**
      * @return The length in bytes of the data file for this SSTable.
      */
-    public abstract long length();
+    public long length()
+    {
+        return dfile.length;
+    }
 
     public void markCompacted()
     {
@@ -228,16 +486,35 @@ public abstract class SSTableReader exte
      * @param bufferSize Buffer size in bytes for this Scanner.
      * @return A Scanner for seeking over the rows of the SSTable.
      */
-    public abstract SSTableScanner getScanner(int bufferSize);
+    public SSTableScanner getScanner(int bufferSize)
+    {
+        return new SSTableScanner(this, bufferSize);
+    }
 
     /**
      * @param bufferSize Buffer size in bytes for this Scanner.
      * @param filter filter to use when reading the columns
      * @return A Scanner for seeking over the rows of the SSTable.
      */
-    public abstract SSTableScanner getScanner(int bufferSize, QueryFilter filter);
-    
-    public abstract FileDataInput getFileDataInput(DecoratedKey decoratedKey, int bufferSize);
+    public SSTableScanner getScanner(int bufferSize, QueryFilter filter)
+    {
+        return new SSTableScanner(this, filter, bufferSize);
+    }
+
+    public FileDataInput getFileDataInput(DecoratedKey decoratedKey, int bufferSize)
+    {
+        long position = getPosition(decoratedKey);
+        if (position < 0)
+            return null;
+
+        return dfile.getSegment(position, bufferSize);
+    }
+
+
+    public int compareTo(SSTableReader o)
+    {
+        return desc.generation - o.desc.generation;
+    }
 
     public AbstractType getColumnComparator()
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java?rev=954875&r1=954874&r2=954875&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java Tue Jun 15 13:29:07 2010
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,17 +20,177 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.Closeable;
+import java.io.IOException;
+import java.io.IOError;
 import java.util.Iterator;
+import java.util.Arrays;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.filter.IColumnIterator;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public abstract class SSTableScanner implements Iterator<IColumnIterator>, Closeable
-{
-    public abstract void seekTo(DecoratedKey seekKey);
 
-    public abstract long getFileLength();
+public class SSTableScanner implements Iterator<IColumnIterator>, Closeable
+{
+    private static Logger logger = LoggerFactory.getLogger(SSTableScanner.class);
 
-    public abstract long getFilePointer();
+    private final BufferedRandomAccessFile file;
+    private final SSTableReader sstable;
+    private IColumnIterator row;
+    private boolean exhausted = false;
+    private Iterator<IColumnIterator> iterator;
+    private QueryFilter filter;
+
+    /**
+     * @param sstable SSTable to scan.
+     */
+    SSTableScanner(SSTableReader sstable, int bufferSize)
+    {
+        try
+        {
+            this.file = new BufferedRandomAccessFile(sstable.getFilename(), "r", bufferSize);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+        this.sstable = sstable;
+    }
+
+    /**
+     * @param sstable SSTable to scan.
+     * @param filter filter to use when scanning the columns
+     */
+    SSTableScanner(SSTableReader sstable, QueryFilter filter, int bufferSize)
+    {
+        try
+        {
+            this.file = new BufferedRandomAccessFile(sstable.getFilename(), "r", bufferSize);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+        this.sstable = sstable;
+        this.filter = filter;
+    }
+
+    public void close() throws IOException
+    {
+        file.close();
+    }
+
+    public void seekTo(DecoratedKey seekKey)
+    {
+        try
+        {
+            long position = sstable.getNearestPosition(seekKey);
+            if (position < 0)
+            {
+                exhausted = true;
+                return;
+            }
+            file.seek(position);
+            row = null;
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("corrupt sstable", e);
+        }
+    }
+
+    public long getFileLength()
+    {
+        try
+        {
+            return file.length();
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    public long getFilePointer()
+    {
+        return file.getFilePointer();
+    }
+
+    public boolean hasNext()
+    {
+        if (iterator == null)
+            iterator = exhausted ? Arrays.asList(new IColumnIterator[0]).iterator() : new KeyScanningIterator();
+        return iterator.hasNext();
+    }
+
+    public IColumnIterator next()
+    {
+        if (iterator == null)
+            iterator = exhausted ? Arrays.asList(new IColumnIterator[0]).iterator() : new KeyScanningIterator();
+        return iterator.next();
+    }
+
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    private class KeyScanningIterator implements Iterator<IColumnIterator>
+    {
+        private long dataStart;
+        private long finishedAt;
+
+        public boolean hasNext()
+        {
+            try
+            {
+                if (row == null)
+                    return !file.isEOF();
+                return finishedAt < file.length();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public IColumnIterator next()
+        {
+            try
+            {
+                if (row != null)
+                    file.seek(finishedAt);
+                assert !file.isEOF();
+
+                DecoratedKey key = StorageService.getPartitioner().convertFromDiskFormat(FBUtilities.readShortByteArray(file));
+                int dataSize = file.readInt();
+                dataStart = file.getFilePointer();
+                finishedAt = dataStart + dataSize;
+
+                if (filter == null)
+                {
+                    return row = new SSTableIdentityIterator(sstable, file, key, dataStart, finishedAt);
+                }
+                else
+                {
+                    return row = filter.getSSTableColumnIterator(sstable, file, key, dataStart);
+                }
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=954875&r1=954874&r2=954875&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Tue Jun 15 13:29:07 2010
@@ -163,7 +163,7 @@ public class SSTableWriter extends SSTab
         SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_DATA));
         ibuilder = null;
         dbuilder = null;
-        return RowIndexedReader.internalOpen(newdesc, partitioner, ifile, dfile, indexSummary, bf, maxDataAge);
+        return SSTableReader.internalOpen(newdesc, partitioner, ifile, dfile, indexSummary, bf, maxDataAge);
     }
 
     static Descriptor rename(Descriptor tmpdesc)