You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2012/04/24 01:49:32 UTC

git commit: Save IndexSummary into new SSTable 'Summary' component patch by Vijay and Pavel Yaskevich; reviewed by Pavel Yaskevich for CASSANDRA-2392

Updated Branches:
  refs/heads/trunk 7b3349f6e -> 048741868


Save IndexSummary into new SSTable 'Summary' component
patch by Vijay and Pavel Yaskevich; reviewed by Pavel Yaskevich for CASSANDRA-2392


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/04874186
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/04874186
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/04874186

Branch: refs/heads/trunk
Commit: 04874186892c86a20181a2f64c5dc24285021b2c
Parents: 7b3349f
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Mon Apr 23 16:28:14 2012 -0700
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Mon Apr 23 16:28:14 2012 -0700

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/io/sstable/Component.java |    6 +-
 .../apache/cassandra/io/sstable/IndexSummary.java  |   44 ++++
 .../org/apache/cassandra/io/sstable/SSTable.java   |    4 +-
 .../apache/cassandra/io/sstable/SSTableReader.java |  154 ++++++++++-----
 .../apache/cassandra/io/sstable/SSTableWriter.java |   12 +-
 .../cassandra/io/util/MmappedSegmentedFile.java    |   29 +++-
 .../apache/cassandra/io/util/SegmentedFile.java    |   14 ++
 8 files changed, 204 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0ecba36..7538980 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
  * (CLI) track elapsed time for `delete' operation (CASSANDRA-4060)
  * (CLI) jline version is bumped to 1.0 to properly  support
    'delete' key function (CASSANDRA-4132)
+ * Save IndexSummary into new SSTable 'Summary' component (CASSANDRA-2392)
 
 
 1.1.1-dev

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/sstable/Component.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java
index 1f34222..45f2875 100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -50,7 +50,9 @@ public class Component
         // statistical metadata about the content of the sstable
         STATS("Statistics.db"),
         // holds sha1 sum of the data file (to be checked by sha1sum)
-        DIGEST("Digest.sha1");
+        DIGEST("Digest.sha1"),
+        // holds SSTable Index Summary and Boundaries
+        SUMMARY("Summary.db");
 
         final String repr;
         Type(String repr)
@@ -75,6 +77,7 @@ public class Component
     public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO, -1);
     public final static Component STATS = new Component(Type.STATS, -1);
     public final static Component DIGEST = new Component(Type.DIGEST, -1);
+    public final static Component SUMMARY = new Component(Type.SUMMARY, -1);
 
     public final Type type;
     public final int id;
@@ -122,6 +125,7 @@ public class Component
             case COMPRESSION_INFO:  component = Component.COMPRESSION_INFO; break;
             case STATS:             component = Component.STATS;            break;
             case DIGEST:            component = Component.DIGEST;           break;
+            case SUMMARY:           component = Component.SUMMARY;          break;
             default:
                  throw new IllegalStateException();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index e36bc90..3cac781 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -17,11 +17,17 @@
  */
 package org.apache.cassandra.io.sstable;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * Two approaches to building an IndexSummary:
@@ -30,6 +36,7 @@ import org.apache.cassandra.db.DecoratedKey;
  */
 public class IndexSummary
 {
+    public static final IndexSummarySerializer serializer = new IndexSummarySerializer();
     private final ArrayList<Long> positions;
     private final ArrayList<DecoratedKey> keys;
     private long keysWritten = 0;
@@ -44,6 +51,12 @@ public class IndexSummary
         keys = new ArrayList<DecoratedKey>((int)expectedEntries);
     }
 
+    private IndexSummary()
+    {
+        positions = new ArrayList<Long>();
+        keys = new ArrayList<DecoratedKey>();
+    }
+
     public void incrementRowid()
     {
         keysWritten++;
@@ -82,4 +95,35 @@ public class IndexSummary
         keys.trimToSize();
         positions.trimToSize();
     }
+
+    public static class IndexSummarySerializer
+    {
+        public void serialize(IndexSummary t, DataOutput dos) throws IOException
+        {
+            assert t.keys.size() == t.positions.size() : "keysize and the position sizes are not same.";
+            dos.writeInt(DatabaseDescriptor.getIndexInterval());
+            dos.writeInt(t.keys.size());
+            for (int i = 0; i < t.keys.size(); i++)
+            {
+                dos.writeLong(t.positions.get(i));
+                ByteBufferUtil.writeWithLength(t.keys.get(i).key, dos);
+            }
+        }
+
+        public IndexSummary deserialize(DataInput dis) throws IOException
+        {
+            IndexSummary summary = new IndexSummary();
+            if (dis.readInt() != DatabaseDescriptor.getIndexInterval())
+                throw new IOException("Cannot read the saved summary because Index Interval changed.");
+
+            int size = dis.readInt();
+            for (int i = 0; i < size; i++)
+            {
+                long location = dis.readLong();
+                ByteBuffer key = ByteBufferUtil.readWithLength(dis);
+                summary.addEntry(StorageService.getPartitioner().decorateKey(key), location);
+            }
+            return summary;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index a18a973..a5148eb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -59,6 +59,7 @@ public abstract class SSTable
     public static final String COMPONENT_FILTER = Component.Type.FILTER.repr;
     public static final String COMPONENT_STATS = Component.Type.STATS.repr;
     public static final String COMPONENT_DIGEST = Component.Type.DIGEST.repr;
+    public static final String COMPONENT_SUMMARY = Component.Type.SUMMARY.repr;
 
     public static final String TEMPFILE_MARKER = "tmp";
 
@@ -135,13 +136,14 @@ public abstract class SSTable
             FileUtils.deleteWithConfirm(desc.filenameFor(Component.DATA));
         for (Component component : components)
         {
-            if (component.equals(Component.DATA) || component.equals(Component.COMPACTED_MARKER))
+            if (component.equals(Component.DATA) || component.equals(Component.COMPACTED_MARKER) || component.equals(Component.SUMMARY))
                 continue;
 
             FileUtils.deleteWithConfirm(desc.filenameFor(component));
         }
         // remove the COMPACTED_MARKER component last if it exists
         FileUtils.delete(desc.filenameFor(Component.COMPACTED_MARKER));
+        FileUtils.delete(desc.filenameFor(Component.SUMMARY));
 
         logger.debug("Deleted {}", desc);
         return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 6108f3f..68cee15 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -338,79 +338,125 @@ public class SSTableReader extends SSTable
                                           : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 
         // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
-        RandomAccessReader input = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
-        DecoratedKey left = null, right = null;
+        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
+
+        // try to load summaries from the disk and check if we need
+        // to read primary index because we should re-create a BloomFilter or pre-load KeyCache
+        final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder);
+        final boolean readIndex = recreatebloom || cacheLoading || !summaryLoaded;
         try
         {
-            long indexSize = input.length();
+            long indexSize = primaryIndex.length();
             long histogramCount = sstableMetadata.estimatedRowSize.count();
             long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
                                ? histogramCount
-                               : estimateRowsFromIndex(input); // statistics is supposed to be optional
-            indexSummary = new IndexSummary(estimatedKeys);
+                               : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
             if (recreatebloom)
                 bf = LegacyBloomFilter.getFilter(estimatedKeys, 15);
 
-            while (true)
-            {
-                long indexPosition = input.getFilePointer();
-                if (indexPosition == indexSize)
-                    break;
-
-                DecoratedKey decoratedKey = null;
-                int len = ByteBufferUtil.readShortLength(input);
-
-                // when primary index file contains info other than data position, there is noway to determine
-                // the last key without deserializing index entry
-                boolean firstKey = left == null;
-                boolean lastKeyForUnpromoted = indexPosition + DBConstants.SHORT_SIZE + len + DBConstants.LONG_SIZE == indexSize;
-                boolean shouldAddEntry = indexSummary.shouldAddEntry();
-                if (shouldAddEntry || cacheLoading || recreatebloom || firstKey || lastKeyForUnpromoted || descriptor.hasPromotedIndexes)
-                {
-                    decoratedKey = decodeKey(partitioner, descriptor, ByteBufferUtil.read(input, len));
-                    if (firstKey)
-                        left = decoratedKey;
-                    right = decoratedKey;
-                }
-                else
-                {
-                    FileUtils.skipBytesFully(input, len);
-                }
+            if (!summaryLoaded)
+                indexSummary = new IndexSummary(estimatedKeys);
 
-                RowIndexEntry indexEntry = null;
-                if (decoratedKey != null)
+            long indexPosition;
+            while (readIndex && (indexPosition = primaryIndex.getFilePointer()) != indexSize)
+            {
+                ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
+                RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(primaryIndex, descriptor);
+                DecoratedKey decoratedKey = decodeKey(partitioner, descriptor, key);
+                if(null == first)
+                    first = decoratedKey;
+                last = decoratedKey;
+
+                if (recreatebloom)
+                    bf.add(decoratedKey.key);
+                if (cacheLoading && keysToLoadInCache.contains(decoratedKey))
+                    cacheKey(decoratedKey, indexEntry);
+
+                // if summary was already read from disk we don't want to re-populate it using primary index
+                if (!summaryLoaded)
                 {
-                    if (recreatebloom)
-                        bf.add(decoratedKey.key);
-                    if (shouldAddEntry)
-                        indexSummary.addEntry(decoratedKey, indexPosition);
-                    // if key cache could be used and we have key already pre-loaded
-                    if (cacheLoading && keysToLoadInCache.contains(decoratedKey))
-                    {
-                        indexEntry = RowIndexEntry.serializer.deserialize(input, descriptor);
-                        cacheKey(decoratedKey, indexEntry);
-                    }
+                    indexSummary.maybeAddEntry(decoratedKey, indexPosition);
+                    ibuilder.addPotentialBoundary(indexPosition);
+                    dbuilder.addPotentialBoundary(indexEntry.position);
                 }
-                if (indexEntry == null)
-                    indexEntry = RowIndexEntry.serializer.deserializePositionOnly(input, descriptor);
-
-                indexSummary.incrementRowid();
-                ibuilder.addPotentialBoundary(indexPosition);
-                dbuilder.addPotentialBoundary(indexEntry.position);
             }
-            indexSummary.complete();
         }
         finally
         {
-            FileUtils.closeQuietly(input);
+            FileUtils.closeQuietly(primaryIndex);
         }
-        this.first = getMinimalKey(left);
-        this.last = getMinimalKey(right);
-        assert this.first.compareTo(this.last) <= 0: String.format("SSTable first key %s > last key %s", this.first, this.last);
-
+        first = getMinimalKey(first);
+        last = getMinimalKey(last);
+        // finalize the load.
+        indexSummary.complete();
         // finalize the state of the reader
         ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
         dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+        if (readIndex) // save summary information to disk
+            saveSummary(this, ibuilder, dbuilder);
+    }
+
+    public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+    {
+        File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY));
+        if (!summariesFile.exists())
+            return false;
+
+        DataInputStream iStream = null;
+        try
+        {
+            iStream = new DataInputStream(new FileInputStream(summariesFile));
+            reader.indexSummary = IndexSummary.serializer.deserialize(iStream);
+            reader.first = decodeKey(StorageService.getPartitioner(), reader.descriptor, ByteBufferUtil.readWithLength(iStream));
+            reader.last = decodeKey(StorageService.getPartitioner(), reader.descriptor, ByteBufferUtil.readWithLength(iStream));
+            ibuilder.deserializeBounds(iStream);
+            dbuilder.deserializeBounds(iStream);
+        }
+        catch (IOException e)
+        {
+            logger.debug("Cannot deserialize SSTable Summary: ", e);
+            // corrupted hence delete it and let it load it now.
+            if (summariesFile.exists())
+                summariesFile.delete();
+
+            return false;
+        }
+        finally
+        {
+            FileUtils.closeQuietly(iStream);
+        }
+
+        return true;
+    }
+
+    public static void saveSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+    {
+        File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY));
+        if (summariesFile.exists())
+            summariesFile.delete();
+
+        DataOutputStream oStream = null;
+        try
+        {
+            oStream = new DataOutputStream(new FileOutputStream(summariesFile));
+            IndexSummary.serializer.serialize(reader.indexSummary, oStream);
+            ByteBufferUtil.writeWithLength(reader.first.key, oStream);
+            ByteBufferUtil.writeWithLength(reader.last.key, oStream);
+            ibuilder.serializeBounds(oStream);
+            dbuilder.serializeBounds(oStream);
+        }
+        catch (IOException e)
+        {
+            logger.debug("Cannot save SSTable Summary: ", e);
+
+            // corrupted hence delete it and let it load it now.
+            if (summariesFile.exists())
+                summariesFile.delete();
+        }
+        finally
+        {
+            FileUtils.closeQuietly(oStream);
+        }
     }
 
     /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index e46e407..1a225e4 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -61,7 +61,12 @@ public class SSTableWriter extends SSTable
 
     private static Set<Component> components(CFMetaData metadata)
     {
-        Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA, Component.FILTER, Component.PRIMARY_INDEX, Component.STATS));
+        Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA,
+                                                                         Component.FILTER,
+                                                                         Component.PRIMARY_INDEX,
+                                                                         Component.STATS,
+                                                                         Component.SUMMARY));
+
         if (metadata.compressionParameters().sstableCompressor != null)
             components.add(Component.COMPRESSION_INFO);
         else
@@ -303,6 +308,8 @@ public class SSTableWriter extends SSTable
                                                            sstableMetadata);
         sstable.first = getMinimalKey(first);
         sstable.last = getMinimalKey(last);
+        // try to save the summaries to disk
+        SSTableReader.saveSummary(sstable, iwriter.builder, dbuilder);
         iwriter = null;
         dbuilder = null;
         return sstable;
@@ -342,7 +349,8 @@ public class SSTableWriter extends SSTable
         try
         {
             // do -Data last because -Data present should mean the sstable was completely renamed before crash
-            for (Component component : Sets.difference(components, Collections.singleton(Component.DATA)))
+            // don't rename -Summary component as it is not created yet and created when SSTable is loaded.
+            for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY)))
                 FBUtilities.renameWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component));
             FBUtilities.renameWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA));
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 3803963..ae81a08 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.io.util;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
@@ -147,7 +149,7 @@ public class MmappedSegmentedFile extends SegmentedFile
     static class Builder extends SegmentedFile.Builder
     {
         // planned segment boundaries
-        private final List<Long> boundaries;
+        private List<Long> boundaries;
 
         // offset of the open segment (first segment begins at 0).
         private long currentStart = 0;
@@ -193,7 +195,8 @@ public class MmappedSegmentedFile extends SegmentedFile
         {
             long length = new File(path).length();
             // add a sentinel value == length
-            boundaries.add(Long.valueOf(length));
+            if (length != boundaries.get(boundaries.size() - 1))
+                boundaries.add(length);
             // create the segments
             return new MmappedSegmentedFile(path, length, createSegments(path));
         }
@@ -226,5 +229,27 @@ public class MmappedSegmentedFile extends SegmentedFile
             }
             return segments;
         }
+
+        @Override
+        public void serializeBounds(DataOutput dos) throws IOException
+        {
+            super.serializeBounds(dos);
+            dos.writeInt(boundaries.size());
+            for (long position: boundaries)
+                dos.writeLong(position);
+        }
+
+        @Override
+        public void deserializeBounds(DataInput dis) throws IOException
+        {
+            super.deserializeBounds(dis);
+            List<Long> temp = new ArrayList<Long>();
+
+            int size = dis.readInt();
+            for (int i = 0; i < size; i++)
+                temp.add(dis.readLong());
+
+            boundaries = temp;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index fd8bfcd..03de78b 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.io.util;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOError;
 import java.io.IOException;
 import java.nio.MappedByteBuffer;
@@ -24,6 +26,7 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 
 import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -106,6 +109,17 @@ public abstract class SegmentedFile
          * @param path The file on disk.
          */
         public abstract SegmentedFile complete(String path);
+
+        public void serializeBounds(DataOutput dos) throws IOException
+        {
+            dos.writeUTF(DatabaseDescriptor.getDiskAccessMode().name());
+        }
+
+        public void deserializeBounds(DataInput dis) throws IOException
+        {
+            if (!dis.readUTF().equals(DatabaseDescriptor.getDiskAccessMode().name()))
+                throw new IOException("Cannot deserialize SSTable Summary component because the DiskAccessMode was changed!");
+        }
     }
 
     static final class Segment extends Pair<Long, MappedByteBuffer> implements Comparable<Segment>