You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2012/04/24 01:49:32 UTC
git commit: Save IndexSummary into new SSTable 'Summary' component
patch by Vijay and Pavel Yaskevich;
reviewed by Pavel Yaskevich for CASSANDRA-2392
Updated Branches:
refs/heads/trunk 7b3349f6e -> 048741868
Save IndexSummary into new SSTable 'Summary' component
patch by Vijay and Pavel Yaskevich; reviewed by Pavel Yaskevich for CASSANDRA-2392
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/04874186
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/04874186
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/04874186
Branch: refs/heads/trunk
Commit: 04874186892c86a20181a2f64c5dc24285021b2c
Parents: 7b3349f
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Mon Apr 23 16:28:14 2012 -0700
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Mon Apr 23 16:28:14 2012 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/io/sstable/Component.java | 6 +-
.../apache/cassandra/io/sstable/IndexSummary.java | 44 ++++
.../org/apache/cassandra/io/sstable/SSTable.java | 4 +-
.../apache/cassandra/io/sstable/SSTableReader.java | 154 ++++++++++-----
.../apache/cassandra/io/sstable/SSTableWriter.java | 12 +-
.../cassandra/io/util/MmappedSegmentedFile.java | 29 +++-
.../apache/cassandra/io/util/SegmentedFile.java | 14 ++
8 files changed, 204 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0ecba36..7538980 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@
* (CLI) track elapsed time for `delete' operation (CASSANDRA-4060)
* (CLI) jline version is bumped to 1.0 to properly support
'delete' key function (CASSANDRA-4132)
+ * Save IndexSummary into new SSTable 'Summary' component (CASSANDRA-2392)
1.1.1-dev
http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/sstable/Component.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java b/src/java/org/apache/cassandra/io/sstable/Component.java
index 1f34222..45f2875 100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -50,7 +50,9 @@ public class Component
// statistical metadata about the content of the sstable
STATS("Statistics.db"),
// holds sha1 sum of the data file (to be checked by sha1sum)
- DIGEST("Digest.sha1");
+ DIGEST("Digest.sha1"),
+ // holds SSTable Index Summary and Boundaries
+ SUMMARY("Summary.db");
final String repr;
Type(String repr)
@@ -75,6 +77,7 @@ public class Component
public final static Component COMPRESSION_INFO = new Component(Type.COMPRESSION_INFO, -1);
public final static Component STATS = new Component(Type.STATS, -1);
public final static Component DIGEST = new Component(Type.DIGEST, -1);
+ public final static Component SUMMARY = new Component(Type.SUMMARY, -1);
public final Type type;
public final int id;
@@ -122,6 +125,7 @@ public class Component
case COMPRESSION_INFO: component = Component.COMPRESSION_INFO; break;
case STATS: component = Component.STATS; break;
case DIGEST: component = Component.DIGEST; break;
+ case SUMMARY: component = Component.SUMMARY; break;
default:
throw new IllegalStateException();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index e36bc90..3cac781 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -17,11 +17,17 @@
*/
package org.apache.cassandra.io.sstable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
/**
* Two approaches to building an IndexSummary:
@@ -30,6 +36,7 @@ import org.apache.cassandra.db.DecoratedKey;
*/
public class IndexSummary
{
+ public static final IndexSummarySerializer serializer = new IndexSummarySerializer();
private final ArrayList<Long> positions;
private final ArrayList<DecoratedKey> keys;
private long keysWritten = 0;
@@ -44,6 +51,12 @@ public class IndexSummary
keys = new ArrayList<DecoratedKey>((int)expectedEntries);
}
+ private IndexSummary()
+ {
+ positions = new ArrayList<Long>();
+ keys = new ArrayList<DecoratedKey>();
+ }
+
public void incrementRowid()
{
keysWritten++;
@@ -82,4 +95,35 @@ public class IndexSummary
keys.trimToSize();
positions.trimToSize();
}
+
+ public static class IndexSummarySerializer
+ {
+ public void serialize(IndexSummary t, DataOutput dos) throws IOException
+ {
+ assert t.keys.size() == t.positions.size() : "keysize and the position sizes are not same.";
+ dos.writeInt(DatabaseDescriptor.getIndexInterval());
+ dos.writeInt(t.keys.size());
+ for (int i = 0; i < t.keys.size(); i++)
+ {
+ dos.writeLong(t.positions.get(i));
+ ByteBufferUtil.writeWithLength(t.keys.get(i).key, dos);
+ }
+ }
+
+ public IndexSummary deserialize(DataInput dis) throws IOException
+ {
+ IndexSummary summary = new IndexSummary();
+ if (dis.readInt() != DatabaseDescriptor.getIndexInterval())
+ throw new IOException("Cannot read the saved summary because Index Interval changed.");
+
+ int size = dis.readInt();
+ for (int i = 0; i < size; i++)
+ {
+ long location = dis.readLong();
+ ByteBuffer key = ByteBufferUtil.readWithLength(dis);
+ summary.addEntry(StorageService.getPartitioner().decorateKey(key), location);
+ }
+ return summary;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index a18a973..a5148eb 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -59,6 +59,7 @@ public abstract class SSTable
public static final String COMPONENT_FILTER = Component.Type.FILTER.repr;
public static final String COMPONENT_STATS = Component.Type.STATS.repr;
public static final String COMPONENT_DIGEST = Component.Type.DIGEST.repr;
+ public static final String COMPONENT_SUMMARY = Component.Type.SUMMARY.repr;
public static final String TEMPFILE_MARKER = "tmp";
@@ -135,13 +136,14 @@ public abstract class SSTable
FileUtils.deleteWithConfirm(desc.filenameFor(Component.DATA));
for (Component component : components)
{
- if (component.equals(Component.DATA) || component.equals(Component.COMPACTED_MARKER))
+ if (component.equals(Component.DATA) || component.equals(Component.COMPACTED_MARKER) || component.equals(Component.SUMMARY))
continue;
FileUtils.deleteWithConfirm(desc.filenameFor(component));
}
// remove the COMPACTED_MARKER component last if it exists
FileUtils.delete(desc.filenameFor(Component.COMPACTED_MARKER));
+ FileUtils.delete(desc.filenameFor(Component.SUMMARY));
logger.debug("Deleted {}", desc);
return true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 6108f3f..68cee15 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -338,79 +338,125 @@ public class SSTableReader extends SSTable
: SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
// we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
- RandomAccessReader input = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
- DecoratedKey left = null, right = null;
+ RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
+
+ // try to load summaries from the disk and check if we need
+ // to read primary index because we should re-create a BloomFilter or pre-load KeyCache
+ final boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder);
+ final boolean readIndex = recreatebloom || cacheLoading || !summaryLoaded;
try
{
- long indexSize = input.length();
+ long indexSize = primaryIndex.length();
long histogramCount = sstableMetadata.estimatedRowSize.count();
long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
? histogramCount
- : estimateRowsFromIndex(input); // statistics is supposed to be optional
- indexSummary = new IndexSummary(estimatedKeys);
+ : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
if (recreatebloom)
bf = LegacyBloomFilter.getFilter(estimatedKeys, 15);
- while (true)
- {
- long indexPosition = input.getFilePointer();
- if (indexPosition == indexSize)
- break;
-
- DecoratedKey decoratedKey = null;
- int len = ByteBufferUtil.readShortLength(input);
-
- // when primary index file contains info other than data position, there is noway to determine
- // the last key without deserializing index entry
- boolean firstKey = left == null;
- boolean lastKeyForUnpromoted = indexPosition + DBConstants.SHORT_SIZE + len + DBConstants.LONG_SIZE == indexSize;
- boolean shouldAddEntry = indexSummary.shouldAddEntry();
- if (shouldAddEntry || cacheLoading || recreatebloom || firstKey || lastKeyForUnpromoted || descriptor.hasPromotedIndexes)
- {
- decoratedKey = decodeKey(partitioner, descriptor, ByteBufferUtil.read(input, len));
- if (firstKey)
- left = decoratedKey;
- right = decoratedKey;
- }
- else
- {
- FileUtils.skipBytesFully(input, len);
- }
+ if (!summaryLoaded)
+ indexSummary = new IndexSummary(estimatedKeys);
- RowIndexEntry indexEntry = null;
- if (decoratedKey != null)
+ long indexPosition;
+ while (readIndex && (indexPosition = primaryIndex.getFilePointer()) != indexSize)
+ {
+ ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
+ RowIndexEntry indexEntry = RowIndexEntry.serializer.deserialize(primaryIndex, descriptor);
+ DecoratedKey decoratedKey = decodeKey(partitioner, descriptor, key);
+ if(null == first)
+ first = decoratedKey;
+ last = decoratedKey;
+
+ if (recreatebloom)
+ bf.add(decoratedKey.key);
+ if (cacheLoading && keysToLoadInCache.contains(decoratedKey))
+ cacheKey(decoratedKey, indexEntry);
+
+ // if summary was already read from disk we don't want to re-populate it using primary index
+ if (!summaryLoaded)
{
- if (recreatebloom)
- bf.add(decoratedKey.key);
- if (shouldAddEntry)
- indexSummary.addEntry(decoratedKey, indexPosition);
- // if key cache could be used and we have key already pre-loaded
- if (cacheLoading && keysToLoadInCache.contains(decoratedKey))
- {
- indexEntry = RowIndexEntry.serializer.deserialize(input, descriptor);
- cacheKey(decoratedKey, indexEntry);
- }
+ indexSummary.maybeAddEntry(decoratedKey, indexPosition);
+ ibuilder.addPotentialBoundary(indexPosition);
+ dbuilder.addPotentialBoundary(indexEntry.position);
}
- if (indexEntry == null)
- indexEntry = RowIndexEntry.serializer.deserializePositionOnly(input, descriptor);
-
- indexSummary.incrementRowid();
- ibuilder.addPotentialBoundary(indexPosition);
- dbuilder.addPotentialBoundary(indexEntry.position);
}
- indexSummary.complete();
}
finally
{
- FileUtils.closeQuietly(input);
+ FileUtils.closeQuietly(primaryIndex);
}
- this.first = getMinimalKey(left);
- this.last = getMinimalKey(right);
- assert this.first.compareTo(this.last) <= 0: String.format("SSTable first key %s > last key %s", this.first, this.last);
-
+ first = getMinimalKey(first);
+ last = getMinimalKey(last);
+ // finalize the load.
+ indexSummary.complete();
// finalize the state of the reader
ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+ if (readIndex) // save summary information to disk
+ saveSummary(this, ibuilder, dbuilder);
+ }
+
+ public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+ {
+ File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY));
+ if (!summariesFile.exists())
+ return false;
+
+ DataInputStream iStream = null;
+ try
+ {
+ iStream = new DataInputStream(new FileInputStream(summariesFile));
+ reader.indexSummary = IndexSummary.serializer.deserialize(iStream);
+ reader.first = decodeKey(StorageService.getPartitioner(), reader.descriptor, ByteBufferUtil.readWithLength(iStream));
+ reader.last = decodeKey(StorageService.getPartitioner(), reader.descriptor, ByteBufferUtil.readWithLength(iStream));
+ ibuilder.deserializeBounds(iStream);
+ dbuilder.deserializeBounds(iStream);
+ }
+ catch (IOException e)
+ {
+ logger.debug("Cannot deserialize SSTable Summary: ", e);
+ // corrupted hence delete it and let it load it now.
+ if (summariesFile.exists())
+ summariesFile.delete();
+
+ return false;
+ }
+ finally
+ {
+ FileUtils.closeQuietly(iStream);
+ }
+
+ return true;
+ }
+
+ public static void saveSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+ {
+ File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY));
+ if (summariesFile.exists())
+ summariesFile.delete();
+
+ DataOutputStream oStream = null;
+ try
+ {
+ oStream = new DataOutputStream(new FileOutputStream(summariesFile));
+ IndexSummary.serializer.serialize(reader.indexSummary, oStream);
+ ByteBufferUtil.writeWithLength(reader.first.key, oStream);
+ ByteBufferUtil.writeWithLength(reader.last.key, oStream);
+ ibuilder.serializeBounds(oStream);
+ dbuilder.serializeBounds(oStream);
+ }
+ catch (IOException e)
+ {
+ logger.debug("Cannot save SSTable Summary: ", e);
+
+ // corrupted hence delete it and let it load it now.
+ if (summariesFile.exists())
+ summariesFile.delete();
+ }
+ finally
+ {
+ FileUtils.closeQuietly(oStream);
+ }
}
/** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index e46e407..1a225e4 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -61,7 +61,12 @@ public class SSTableWriter extends SSTable
private static Set<Component> components(CFMetaData metadata)
{
- Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA, Component.FILTER, Component.PRIMARY_INDEX, Component.STATS));
+ Set<Component> components = new HashSet<Component>(Arrays.asList(Component.DATA,
+ Component.FILTER,
+ Component.PRIMARY_INDEX,
+ Component.STATS,
+ Component.SUMMARY));
+
if (metadata.compressionParameters().sstableCompressor != null)
components.add(Component.COMPRESSION_INFO);
else
@@ -303,6 +308,8 @@ public class SSTableWriter extends SSTable
sstableMetadata);
sstable.first = getMinimalKey(first);
sstable.last = getMinimalKey(last);
+ // try to save the summaries to disk
+ SSTableReader.saveSummary(sstable, iwriter.builder, dbuilder);
iwriter = null;
dbuilder = null;
return sstable;
@@ -342,7 +349,8 @@ public class SSTableWriter extends SSTable
try
{
// do -Data last because -Data present should mean the sstable was completely renamed before crash
- for (Component component : Sets.difference(components, Collections.singleton(Component.DATA)))
+ // don't rename -Summary component as it is not created yet and created when SSTable is loaded.
+ for (Component component : Sets.difference(components, Sets.newHashSet(Component.DATA, Component.SUMMARY)))
FBUtilities.renameWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component));
FBUtilities.renameWithConfirm(tmpdesc.filenameFor(Component.DATA), newdesc.filenameFor(Component.DATA));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 3803963..ae81a08 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.io.util;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.File;
import java.io.IOError;
import java.io.IOException;
@@ -147,7 +149,7 @@ public class MmappedSegmentedFile extends SegmentedFile
static class Builder extends SegmentedFile.Builder
{
// planned segment boundaries
- private final List<Long> boundaries;
+ private List<Long> boundaries;
// offset of the open segment (first segment begins at 0).
private long currentStart = 0;
@@ -193,7 +195,8 @@ public class MmappedSegmentedFile extends SegmentedFile
{
long length = new File(path).length();
// add a sentinel value == length
- boundaries.add(Long.valueOf(length));
+ if (length != boundaries.get(boundaries.size() - 1))
+ boundaries.add(length);
// create the segments
return new MmappedSegmentedFile(path, length, createSegments(path));
}
@@ -226,5 +229,27 @@ public class MmappedSegmentedFile extends SegmentedFile
}
return segments;
}
+
+ @Override
+ public void serializeBounds(DataOutput dos) throws IOException
+ {
+ super.serializeBounds(dos);
+ dos.writeInt(boundaries.size());
+ for (long position: boundaries)
+ dos.writeLong(position);
+ }
+
+ @Override
+ public void deserializeBounds(DataInput dis) throws IOException
+ {
+ super.deserializeBounds(dis);
+ List<Long> temp = new ArrayList<Long>();
+
+ int size = dis.readInt();
+ for (int i = 0; i < size; i++)
+ temp.add(dis.readLong());
+
+ boundaries = temp;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/04874186/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index fd8bfcd..03de78b 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.io.util;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOError;
import java.io.IOException;
import java.nio.MappedByteBuffer;
@@ -24,6 +26,7 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.Pair;
/**
@@ -106,6 +109,17 @@ public abstract class SegmentedFile
* @param path The file on disk.
*/
public abstract SegmentedFile complete(String path);
+
+ public void serializeBounds(DataOutput dos) throws IOException
+ {
+ dos.writeUTF(DatabaseDescriptor.getDiskAccessMode().name());
+ }
+
+ public void deserializeBounds(DataInput dis) throws IOException
+ {
+ if (!dis.readUTF().equals(DatabaseDescriptor.getDiskAccessMode().name()))
+ throw new IOException("Cannot deserialize SSTable Summary component because the DiskAccessMode was changed!");
+ }
}
static final class Segment extends Pair<Long, MappedByteBuffer> implements Comparable<Segment>