You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/12/13 23:52:55 UTC
[2/2] git commit: SSTable metadata(Stats.db) format change
SSTable metadata(Stats.db) format change
patch by yukim; reviewed by thobbs for CASSANDRA-6356
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/74bf5aa1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/74bf5aa1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/74bf5aa1
Branch: refs/heads/trunk
Commit: 74bf5aa16e7080360febca1745307a4d7ced32dc
Parents: 84d85ee
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Dec 13 16:33:26 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Dec 13 16:33:26 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 14 +-
.../org/apache/cassandra/db/DataTracker.java | 9 +-
src/java/org/apache/cassandra/db/Memtable.java | 4 +-
.../cassandra/db/commitlog/ReplayPosition.java | 4 +-
.../db/compaction/CompactionManager.java | 3 +-
.../cassandra/db/compaction/CompactionTask.java | 3 +-
.../db/compaction/LeveledManifest.java | 31 +-
.../cassandra/db/compaction/Upgrader.java | 3 +-
.../io/compress/CompressedSequentialWriter.java | 9 +-
.../io/sstable/AbstractSSTableSimpleWriter.java | 3 +-
.../apache/cassandra/io/sstable/Descriptor.java | 50 +-
.../cassandra/io/sstable/IndexSummary.java | 9 +-
.../cassandra/io/sstable/SSTableMetadata.java | 518 -------------------
.../cassandra/io/sstable/SSTableReader.java | 183 ++++---
.../cassandra/io/sstable/SSTableWriter.java | 32 +-
.../io/sstable/metadata/CompactionMetadata.java | 93 ++++
.../metadata/IMetadataComponentSerializer.java | 58 +++
.../sstable/metadata/IMetadataSerializer.java | 68 +++
.../metadata/LegacyMetadataSerializer.java | 156 ++++++
.../io/sstable/metadata/MetadataCollector.java | 220 ++++++++
.../io/sstable/metadata/MetadataComponent.java | 34 ++
.../io/sstable/metadata/MetadataSerializer.java | 144 ++++++
.../io/sstable/metadata/MetadataType.java | 38 ++
.../io/sstable/metadata/StatsMetadata.java | 253 +++++++++
.../io/sstable/metadata/ValidationMetadata.java | 91 ++++
.../cassandra/metrics/ColumnFamilyMetrics.java | 6 +-
.../cassandra/tools/SSTableLevelResetter.java | 11 +-
.../cassandra/tools/SSTableMetadataViewer.java | 34 +-
.../cassandra/utils/EstimatedHistogram.java | 14 +-
.../cassandra/utils/StreamingHistogram.java | 20 +-
.../cassandra/db/ColumnFamilyStoreTest.java | 3 +-
.../LeveledCompactionStrategyTest.java | 2 +-
.../CompressedRandomAccessReaderTest.java | 6 +-
.../cassandra/io/sstable/IndexSummaryTest.java | 6 +-
.../sstable/SSTableMetadataSerializerTest.java | 90 ----
.../metadata/MetadataSerializerTest.java | 88 ++++
.../compress/CompressedInputStreamTest.java | 4 +-
38 files changed, 1505 insertions(+), 810 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6e00201..4c74ea9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,7 @@
* Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406)
* Batch read from OTC's queue and cleanup (CASSANDRA-1632)
* Secondary index support for collections (CASSANDRA-4511)
+ * SSTable metadata(Stats.db) format change (CASSANDRA-6356)
2.0.4
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 4e54af0..a98e30b 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -62,6 +62,8 @@ import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.ColumnFamilyMetrics;
import org.apache.cassandra.service.CacheService;
@@ -486,7 +488,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Directories directories = Directories.create(keyspace, columnfamily);
// sanity-check unfinishedGenerations
- Set<Integer> allGenerations = new HashSet<Integer>();
+ Set<Integer> allGenerations = new HashSet<>();
for (Descriptor desc : directories.sstableLister().list().keySet())
allGenerations.add(desc.generation);
if (!allGenerations.containsAll(unfinishedGenerations))
@@ -497,7 +499,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// remove new sstables from compactions that didn't complete, and compute
// set of ancestors that shouldn't exist anymore
- Set<Integer> completedAncestors = new HashSet<Integer>();
+ Set<Integer> completedAncestors = new HashSet<>();
for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
{
Descriptor desc = sstableFiles.getKey();
@@ -506,7 +508,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Set<Integer> ancestors;
try
{
- ancestors = SSTableMetadata.serializer.deserialize(desc).right;
+ CompactionMetadata compactionMetadata = (CompactionMetadata) desc.getMetadataSerializer().deserialize(desc, MetadataType.COMPACTION);
+ ancestors = compactionMetadata.ancestors;
}
catch (IOException e)
{
@@ -595,10 +598,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
try
{
if (new File(descriptor.filenameFor(Component.STATS)).exists())
- {
- Pair<SSTableMetadata, Set<Integer>> oldMetadata = SSTableMetadata.serializer.deserialize(descriptor);
- LeveledManifest.mutateLevel(oldMetadata, descriptor, descriptor.filenameFor(Component.STATS), 0);
- }
+ descriptor.getMetadataSerializer().mutateLevel(descriptor, 0);
}
catch (IOException e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 64b088d..692c5c5 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.StorageMetrics;
@@ -460,11 +459,7 @@ public class DataTracker
allDroppable += sstable.getDroppableTombstonesBefore(localTime - sstable.metadata.getGcGraceSeconds());
allColumns += sstable.getEstimatedColumnCount().mean() * sstable.getEstimatedColumnCount().count();
}
- if (allColumns > 0)
- {
- return allDroppable / allColumns;
- }
- return 0;
+ return allColumns > 0 ? allDroppable / allColumns : 0;
}
public void notifySSTablesChanged(Collection<SSTableReader> removed, Collection<SSTableReader> added, OperationType compactionType)
@@ -630,4 +625,4 @@ public class DataTracker
return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", memtablesPendingFlush.size(), sstables, compacting);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 064851a..785f0c2 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -35,9 +35,9 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.sstable.SSTableMetadata;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DiskAwareRunnable;
import org.apache.cassandra.utils.Allocator;
import org.github.jamm.MemoryMeter;
@@ -381,7 +381,7 @@ public class Memtable
public SSTableWriter createFlushWriter(String filename) throws ExecutionException, InterruptedException
{
- SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector(cfs.metadata.comparator).replayPosition(context.get());
+ MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context.get());
return new SSTableWriter(filename,
rows.size(),
cfs.metadata,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
index 354444b..fb78ed3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
+++ b/src/java/org/apache/cassandra/db/commitlog/ReplayPosition.java
@@ -130,9 +130,9 @@ public class ReplayPosition implements Comparable<ReplayPosition>
return new ReplayPosition(in.readLong(), in.readInt());
}
- public long serializedSize(ReplayPosition object, TypeSizes typeSizes)
+ public long serializedSize(ReplayPosition rp, TypeSizes typeSizes)
{
- throw new UnsupportedOperationException();
+ return typeSizes.sizeof(rp.segment) + typeSizes.sizeof(rp.position);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 5bcaca9..2090b6f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.repair.Validator;
@@ -738,7 +739,7 @@ public class CompactionManager implements CompactionManagerMBean
expectedBloomFilterSize,
cfs.metadata,
cfs.partitioner,
- SSTableMetadata.createCollector(Collections.singleton(sstable), cfs.metadata.comparator, sstable.getSSTableLevel()));
+ new MetadataCollector(Collections.singleton(sstable), cfs.metadata.comparator, sstable.getSSTableLevel()));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index f4cc500..59f2f2f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.utils.CloseableIterator;
public class CompactionTask extends AbstractCompactionTask
@@ -281,7 +282,7 @@ public class CompactionTask extends AbstractCompactionTask
keysPerSSTable,
cfs.metadata,
cfs.partitioner,
- SSTableMetadata.createCollector(toCompact, cfs.metadata.comparator, getLevel()));
+ new MetadataCollector(toCompact, cfs.metadata.comparator, getLevel()));
}
protected int getLevel()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 232d1f7..2ec42e4 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -188,7 +188,7 @@ public class LeveledManifest
String metaDataFile = sstable.descriptor.filenameFor(Component.STATS);
try
{
- mutateLevel(Pair.create(sstable.getSSTableMetadata(), sstable.getAncestors()), sstable.descriptor, metaDataFile, 0);
+ sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 0);
sstable.reloadSSTableMetadata();
add(sstable);
}
@@ -571,33 +571,4 @@ public class LeveledManifest
return newLevel;
}
-
- /**
- * Scary method mutating existing sstable component
- *
- * Tries to do it safely by moving the new file on top of the old one
- *
- * Caller needs to reload the sstable metadata (sstableReader.reloadSSTableMetadata())
- *
- * @see org.apache.cassandra.io.sstable.SSTableReader#reloadSSTableMetadata()
- *
- * @param oldMetadata
- * @param descriptor
- * @param filename
- * @param level
- * @throws IOException
- */
- public static synchronized void mutateLevel(Pair<SSTableMetadata, Set<Integer>> oldMetadata, Descriptor descriptor, String filename, int level) throws IOException
- {
- logger.debug("Mutating {} to level {}", descriptor.filenameFor(Component.STATS), level);
- SSTableMetadata metadata = SSTableMetadata.copyWithNewSSTableLevel(oldMetadata.left, level);
- DataOutputStream out = new DataOutputStream(new FileOutputStream(filename + "-tmp"));
- SSTableMetadata.serializer.legacySerialize(metadata, oldMetadata.right, descriptor, out);
- out.flush();
- out.close();
- // we cant move a file on top of another file in windows:
- if (!FBUtilities.isUnix())
- FileUtils.delete(filename);
- FileUtils.renameWithConfirm(filename + "-tmp", filename);
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 383ff00..ef881c4 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -26,6 +26,7 @@ import com.google.common.base.Throwables;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.OutputHandler;
@@ -62,7 +63,7 @@ public class Upgrader
private SSTableWriter createCompactionWriter()
{
- SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector(cfs.getComparator());
+ MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.getComparator());
// Get the max timestamp of the precompacted sstables
// and adds generation of live ancestors
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 54b990f..99bbd85 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -21,13 +21,12 @@ import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.zip.Adler32;
-import java.util.zip.CRC32;
import java.util.zip.Checksum;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.SSTableMetadata.Collector;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.FileMark;
import org.apache.cassandra.io.util.SequentialWriter;
@@ -37,7 +36,7 @@ public class CompressedSequentialWriter extends SequentialWriter
String indexFilePath,
boolean skipIOCache,
CompressionParameters parameters,
- Collector sstableMetadataCollector)
+ MetadataCollector sstableMetadataCollector)
{
return new CompressedSequentialWriter(new File(dataFilePath), indexFilePath, skipIOCache, parameters, sstableMetadataCollector);
}
@@ -60,13 +59,13 @@ public class CompressedSequentialWriter extends SequentialWriter
private long originalSize = 0, compressedSize = 0;
- private final Collector sstableMetadataCollector;
+ private final MetadataCollector sstableMetadataCollector;
public CompressedSequentialWriter(File file,
String indexFilePath,
boolean skipIOCache,
CompressionParameters parameters,
- Collector sstableMetadataCollector)
+ MetadataCollector sstableMetadataCollector)
{
super(file, parameters.chunkLength(), skipIOCache);
this.compressor = parameters.sstableCompressor;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 0059fda..6018369 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.Pair;
@@ -56,7 +57,7 @@ public abstract class AbstractSSTableSimpleWriter
0, // We don't care about the bloom filter
metadata,
DatabaseDescriptor.getPartitioner(),
- SSTableMetadata.createCollector(metadata.comparator));
+ new MetadataCollector(metadata.comparator));
}
// find available generation and pick up filename from that
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index fef6a1e..6f84296 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -22,6 +22,9 @@ import java.util.StringTokenizer;
import com.google.common.base.Objects;
+import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer;
+import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer;
+import org.apache.cassandra.io.sstable.metadata.MetadataSerializer;
import org.apache.cassandra.utils.Pair;
import static org.apache.cassandra.io.sstable.Component.separator;
@@ -43,8 +46,8 @@ public class Descriptor
// we always incremented the major version.
public static class Version
{
- // This needs to be at the beginning for initialization sake
- public static final String current_version = "jc";
+ // This needs to be at the begining for initialization sake
+ public static final String current_version = "ka";
// ic (1.2.5): omits per-row bloom filter of column names
// ja (2.0.0): super columns are serialized as composites (note that there is no real format change,
@@ -57,7 +60,8 @@ public class Descriptor
// tracks max/min column values (according to comparator)
// jb (2.0.1): switch from crc32 to adler32 for compression checksums
// checksum the compressed data
- // jc (2.1.0): index summaries can be downsampled and the sampling level is persisted
+ // ka (2.1.0): new Statistics.db file format
+ // index summaries can be downsampled and the sampling level is persisted
public static final Version CURRENT = new Version(current_version);
@@ -72,6 +76,7 @@ public class Descriptor
public final boolean tracksMaxMinColumnNames;
public final boolean hasPostCompressionAdlerChecksums;
public final boolean hasSamplingLevel;
+ public final boolean newStatsFile;
public Version(String version)
{
@@ -84,7 +89,8 @@ public class Descriptor
hasRowSizeAndColumnCount = version.compareTo("ja") < 0;
tracksMaxMinColumnNames = version.compareTo("ja") >= 0;
hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0;
- hasSamplingLevel = version.compareTo("jc") >= 0;
+ hasSamplingLevel = version.compareTo("ka") >= 0;
+ newStatsFile = version.compareTo("ka") >= 0;
}
/**
@@ -102,11 +108,6 @@ public class Descriptor
return version.compareTo("ic") >= 0 && version.charAt(0) <= CURRENT.version.charAt(0);
}
- public boolean isStreamCompatible()
- {
- return isCompatible() && version.charAt(0) >= 'j';
- }
-
@Override
public String toString()
{
@@ -116,11 +117,7 @@ public class Descriptor
@Override
public boolean equals(Object o)
{
- if (o == this)
- return true;
- if (!(o instanceof Version))
- return false;
- return version.equals(((Version)o).version);
+ return o == this || o instanceof Version && version.equals(((Version) o).version);
}
@Override
@@ -256,23 +253,20 @@ public class Descriptor
return new Descriptor(version, directory, ksname, cfname, generation, temporary);
}
- /**
- * @return true if the current Cassandra version can read the given sstable version
- */
- public boolean isCompatible()
+ public IMetadataSerializer getMetadataSerializer()
{
- return version.isCompatible();
+ if (version.newStatsFile)
+ return new MetadataSerializer();
+ else
+ return new LegacyMetadataSerializer();
}
/**
- * @return true if the current Cassandra version can stream the given sstable version
- * from another node. This is stricter than opening it locally [isCompatible] because
- * streaming needs to rebuild all the non-data components, and it only knows how to write
- * the latest version.
+ * @return true if the current Cassandra version can read the given sstable version
*/
- public boolean isStreamCompatible()
+ public boolean isCompatible()
{
- return version.isStreamCompatible();
+ return version.isCompatible();
}
@Override
@@ -289,7 +283,11 @@ public class Descriptor
if (!(o instanceof Descriptor))
return false;
Descriptor that = (Descriptor)o;
- return that.directory.equals(this.directory) && that.generation == this.generation && that.ksname.equals(this.ksname) && that.cfname.equals(this.cfname) && that.temporary == this.temporary;
+ return that.directory.equals(this.directory)
+ && that.generation == this.generation
+ && that.ksname.equals(this.ksname)
+ && that.cfname.equals(this.cfname)
+ && that.temporary == this.temporary;
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index 4fc4737..6d8712a 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -202,9 +202,14 @@ public class IndexSummary implements Closeable
FBUtilities.copy(new MemoryInputStream(t.bytes), out, t.bytes.size());
}
- public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel) throws IOException
+ public IndexSummary deserialize(DataInputStream in, IPartitioner partitioner, boolean haveSamplingLevel, int expectedIndexInterval) throws IOException
{
int indexInterval = in.readInt();
+ if (indexInterval != expectedIndexInterval)
+ {
+ throw new IOException(String.format("Cannot read index summary because Index Interval changed from %d to %d.",
+ indexInterval, expectedIndexInterval));
+ }
int summarySize = in.readInt();
long offheapSize = in.readLong();
int samplingLevel, fullSamplingSummarySize;
@@ -229,4 +234,4 @@ public class IndexSummary implements Closeable
{
bytes.free();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java b/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
deleted file mode 100644
index 8ddfdd7..0000000
--- a/src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
+++ /dev/null
@@ -1,518 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable;
-
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.StreamingHistogram;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.EstimatedHistogram;
-
-/**
- * Metadata for a SSTable.
- * Metadata includes:
- * - estimated row size histogram
- * - estimated column count histogram
- * - replay position
- * - max column timestamp
- * - max local deletion time
- * - bloom filter fp chance
- * - compression ratio
- * - partitioner
- * - generations of sstables from which this sstable was compacted, if any
- * - tombstone drop time histogram
- *
- * An SSTableMetadata should be instantiated via the Collector, openFromDescriptor()
- * or createDefaultInstance()
- */
-public class SSTableMetadata
-{
- public static final double NO_BLOOM_FLITER_FP_CHANCE = -1.0;
- public static final double NO_COMPRESSION_RATIO = -1.0;
- public static final SSTableMetadataSerializer serializer = new SSTableMetadataSerializer();
-
- public final EstimatedHistogram estimatedRowSize;
- public final EstimatedHistogram estimatedColumnCount;
- public final ReplayPosition replayPosition;
- public final long minTimestamp;
- public final long maxTimestamp;
- public final int maxLocalDeletionTime;
- public final double bloomFilterFPChance;
- public final double compressionRatio;
- public final String partitioner;
- public final StreamingHistogram estimatedTombstoneDropTime;
- public final int sstableLevel;
- public final List<ByteBuffer> maxColumnNames;
- public final List<ByteBuffer> minColumnNames;
-
- private SSTableMetadata()
- {
- this(defaultRowSizeHistogram(),
- defaultColumnCountHistogram(),
- ReplayPosition.NONE,
- Long.MIN_VALUE,
- Long.MAX_VALUE,
- Integer.MAX_VALUE,
- NO_BLOOM_FLITER_FP_CHANCE,
- NO_COMPRESSION_RATIO,
- null,
- defaultTombstoneDropTimeHistogram(),
- 0,
- Collections.<ByteBuffer>emptyList(),
- Collections.<ByteBuffer>emptyList());
- }
-
- private SSTableMetadata(EstimatedHistogram rowSizes,
- EstimatedHistogram columnCounts,
- ReplayPosition replayPosition,
- long minTimestamp,
- long maxTimestamp,
- int maxLocalDeletionTime,
- double bloomFilterFPChance,
- double compressionRatio,
- String partitioner,
- StreamingHistogram estimatedTombstoneDropTime,
- int sstableLevel,
- List<ByteBuffer> minColumnNames,
- List<ByteBuffer> maxColumnNames)
- {
- this.estimatedRowSize = rowSizes;
- this.estimatedColumnCount = columnCounts;
- this.replayPosition = replayPosition;
- this.minTimestamp = minTimestamp;
- this.maxTimestamp = maxTimestamp;
- this.maxLocalDeletionTime = maxLocalDeletionTime;
- this.bloomFilterFPChance = bloomFilterFPChance;
- this.compressionRatio = compressionRatio;
- this.partitioner = partitioner;
- this.estimatedTombstoneDropTime = estimatedTombstoneDropTime;
- this.sstableLevel = sstableLevel;
- this.minColumnNames = minColumnNames;
- this.maxColumnNames = maxColumnNames;
- }
-
- public static Collector createCollector(AbstractType<?> columnNameComparator)
- {
- return new Collector(columnNameComparator);
- }
-
- public static Collector createCollector(Collection<SSTableReader> sstables, AbstractType<?> columnNameComparator, int level)
- {
- Collector collector = new Collector(columnNameComparator);
-
- collector.replayPosition(ReplayPosition.getReplayPosition(sstables));
- collector.sstableLevel(level);
- // Get the max timestamp of the precompacted sstables
- // and adds generation of live ancestors
- for (SSTableReader sstable : sstables)
- {
- collector.addAncestor(sstable.descriptor.generation);
- for (Integer i : sstable.getAncestors())
- {
- if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
- collector.addAncestor(i);
- }
- }
-
- return collector;
- }
-
- /**
- * Used when updating sstablemetadata files with an sstable level
- * @param metadata
- * @param sstableLevel
- * @return
- */
- @Deprecated
- public static SSTableMetadata copyWithNewSSTableLevel(SSTableMetadata metadata, int sstableLevel)
- {
- return new SSTableMetadata(metadata.estimatedRowSize,
- metadata.estimatedColumnCount,
- metadata.replayPosition,
- metadata.minTimestamp,
- metadata.maxTimestamp,
- metadata.maxLocalDeletionTime,
- metadata.bloomFilterFPChance,
- metadata.compressionRatio,
- metadata.partitioner,
- metadata.estimatedTombstoneDropTime,
- sstableLevel,
- metadata.minColumnNames,
- metadata.maxColumnNames);
-
- }
-
- static EstimatedHistogram defaultColumnCountHistogram()
- {
- // EH of 114 can track a max value of 2395318855, i.e., > 2B columns
- return new EstimatedHistogram(114);
- }
-
- static EstimatedHistogram defaultRowSizeHistogram()
- {
- // EH of 150 can track a max value of 1697806495183, i.e., > 1.5PB
- return new EstimatedHistogram(150);
- }
-
- static StreamingHistogram defaultTombstoneDropTimeHistogram()
- {
- return new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
- }
-
- /**
- * @param gcBefore
- * @return estimated droppable tombstone ratio at given gcBefore time.
- */
- public double getEstimatedDroppableTombstoneRatio(int gcBefore)
- {
- long estimatedColumnCount = this.estimatedColumnCount.mean() * this.estimatedColumnCount.count();
- if (estimatedColumnCount > 0)
- {
- double droppable = getDroppableTombstonesBefore(gcBefore);
- return droppable / estimatedColumnCount;
- }
- return 0.0f;
- }
-
- /**
- * Get the amount of droppable tombstones
- * @param gcBefore the gc time
- * @return amount of droppable tombstones
- */
- public double getDroppableTombstonesBefore(int gcBefore)
- {
- return estimatedTombstoneDropTime.sum(gcBefore);
- }
-
- public static class Collector
- {
- protected EstimatedHistogram estimatedRowSize = defaultRowSizeHistogram();
- protected EstimatedHistogram estimatedColumnCount = defaultColumnCountHistogram();
- protected ReplayPosition replayPosition = ReplayPosition.NONE;
- protected long minTimestamp = Long.MAX_VALUE;
- protected long maxTimestamp = Long.MIN_VALUE;
- protected int maxLocalDeletionTime = Integer.MIN_VALUE;
- protected double compressionRatio = NO_COMPRESSION_RATIO;
- protected Set<Integer> ancestors = new HashSet<Integer>();
- protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram();
- protected int sstableLevel;
- protected List<ByteBuffer> minColumnNames = Collections.emptyList();
- protected List<ByteBuffer> maxColumnNames = Collections.emptyList();
- private final AbstractType<?> columnNameComparator;
-
- private Collector(AbstractType<?> columnNameComparator)
- {
- this.columnNameComparator = columnNameComparator;
- }
- public void addRowSize(long rowSize)
- {
- estimatedRowSize.add(rowSize);
- }
-
- public void addColumnCount(long columnCount)
- {
- estimatedColumnCount.add(columnCount);
- }
-
- public void mergeTombstoneHistogram(StreamingHistogram histogram)
- {
- estimatedTombstoneDropTime.merge(histogram);
- }
-
- /**
- * Ratio is compressed/uncompressed and it is
- * if you have 1.x then compression isn't helping
- */
- public void addCompressionRatio(long compressed, long uncompressed)
- {
- compressionRatio = (double) compressed/uncompressed;
- }
-
- public void updateMinTimestamp(long potentialMin)
- {
- minTimestamp = Math.min(minTimestamp, potentialMin);
- }
-
- public void updateMaxTimestamp(long potentialMax)
- {
- maxTimestamp = Math.max(maxTimestamp, potentialMax);
- }
-
- public void updateMaxLocalDeletionTime(int maxLocalDeletionTime)
- {
- this.maxLocalDeletionTime = Math.max(this.maxLocalDeletionTime, maxLocalDeletionTime);
- }
-
- public SSTableMetadata finalizeMetadata(String partitioner, double bloomFilterFPChance)
- {
- return new SSTableMetadata(estimatedRowSize,
- estimatedColumnCount,
- replayPosition,
- minTimestamp,
- maxTimestamp,
- maxLocalDeletionTime,
- bloomFilterFPChance,
- compressionRatio,
- partitioner,
- estimatedTombstoneDropTime,
- sstableLevel,
- minColumnNames,
- maxColumnNames);
- }
-
- public Collector estimatedRowSize(EstimatedHistogram estimatedRowSize)
- {
- this.estimatedRowSize = estimatedRowSize;
- return this;
- }
-
- public Collector estimatedColumnCount(EstimatedHistogram estimatedColumnCount)
- {
- this.estimatedColumnCount = estimatedColumnCount;
- return this;
- }
-
- public Collector replayPosition(ReplayPosition replayPosition)
- {
- this.replayPosition = replayPosition;
- return this;
- }
-
- public Collector addAncestor(int generation)
- {
- this.ancestors.add(generation);
- return this;
- }
-
- void update(long size, ColumnStats stats)
- {
- updateMinTimestamp(stats.minTimestamp);
- /*
- * The max timestamp is not always collected here (more precisely, row.maxTimestamp() may return Long.MIN_VALUE),
- * to avoid deserializing an EchoedRow.
- * This is the reason why it is collected first when calling ColumnFamilyStore.createCompactionWriter
- * However, for old sstables without timestamp, we still want to update the timestamp (and we know
- * that in this case we will not use EchoedRow, since CompactionControler.needsDeserialize() will be true).
- */
- updateMaxTimestamp(stats.maxTimestamp);
- updateMaxLocalDeletionTime(stats.maxLocalDeletionTime);
- addRowSize(size);
- addColumnCount(stats.columnCount);
- mergeTombstoneHistogram(stats.tombstoneHistogram);
- updateMinColumnNames(stats.minColumnNames);
- updateMaxColumnNames(stats.maxColumnNames);
- }
-
- public Collector sstableLevel(int sstableLevel)
- {
- this.sstableLevel = sstableLevel;
- return this;
- }
-
- public Collector updateMinColumnNames(List<ByteBuffer> minColumnNames)
- {
- if (minColumnNames.size() > 0)
- this.minColumnNames = ColumnNameHelper.mergeMin(this.minColumnNames, minColumnNames, columnNameComparator);
- return this;
- }
-
- public Collector updateMaxColumnNames(List<ByteBuffer> maxColumnNames)
- {
- if (maxColumnNames.size() > 0)
- this.maxColumnNames = ColumnNameHelper.mergeMax(this.maxColumnNames, maxColumnNames, columnNameComparator);
- return this;
- }
- }
-
- public static class SSTableMetadataSerializer
- {
- private static final Logger logger = LoggerFactory.getLogger(SSTableMetadataSerializer.class);
-
- public void serialize(SSTableMetadata sstableStats, Set<Integer> ancestors, DataOutput out) throws IOException
- {
- assert sstableStats.partitioner != null;
-
- EstimatedHistogram.serializer.serialize(sstableStats.estimatedRowSize, out);
- EstimatedHistogram.serializer.serialize(sstableStats.estimatedColumnCount, out);
- ReplayPosition.serializer.serialize(sstableStats.replayPosition, out);
- out.writeLong(sstableStats.minTimestamp);
- out.writeLong(sstableStats.maxTimestamp);
- out.writeInt(sstableStats.maxLocalDeletionTime);
- out.writeDouble(sstableStats.bloomFilterFPChance);
- out.writeDouble(sstableStats.compressionRatio);
- out.writeUTF(sstableStats.partitioner);
- out.writeInt(ancestors.size());
- for (Integer g : ancestors)
- out.writeInt(g);
- StreamingHistogram.serializer.serialize(sstableStats.estimatedTombstoneDropTime, out);
- out.writeInt(sstableStats.sstableLevel);
- serializeMinMaxColumnNames(sstableStats.minColumnNames, sstableStats.maxColumnNames, out);
- }
-
- private void serializeMinMaxColumnNames(List<ByteBuffer> minColNames, List<ByteBuffer> maxColNames, DataOutput out) throws IOException
- {
- out.writeInt(minColNames.size());
- for (ByteBuffer columnName : minColNames)
- ByteBufferUtil.writeWithShortLength(columnName, out);
- out.writeInt(maxColNames.size());
- for (ByteBuffer columnName : maxColNames)
- ByteBufferUtil.writeWithShortLength(columnName, out);
- }
- /**
- * Used to serialize to an old version - needed to be able to update sstable level without a full compaction.
- *
- * @deprecated will be removed when it is assumed that the minimum upgrade-from-version is the version that this
- * patch made it into
- *
- * @param sstableStats
- * @param legacyDesc
- * @param out
- * @throws IOException
- */
- @Deprecated
- public void legacySerialize(SSTableMetadata sstableStats, Set<Integer> ancestors, Descriptor legacyDesc, DataOutput out) throws IOException
- {
- EstimatedHistogram.serializer.serialize(sstableStats.estimatedRowSize, out);
- EstimatedHistogram.serializer.serialize(sstableStats.estimatedColumnCount, out);
- ReplayPosition.serializer.serialize(sstableStats.replayPosition, out);
- out.writeLong(sstableStats.minTimestamp);
- out.writeLong(sstableStats.maxTimestamp);
- if (legacyDesc.version.tracksMaxLocalDeletionTime)
- out.writeInt(sstableStats.maxLocalDeletionTime);
- if (legacyDesc.version.hasBloomFilterFPChance)
- out.writeDouble(sstableStats.bloomFilterFPChance);
- out.writeDouble(sstableStats.compressionRatio);
- out.writeUTF(sstableStats.partitioner);
- out.writeInt(ancestors.size());
- for (Integer g : ancestors)
- out.writeInt(g);
- StreamingHistogram.serializer.serialize(sstableStats.estimatedTombstoneDropTime, out);
- out.writeInt(sstableStats.sstableLevel);
- if (legacyDesc.version.tracksMaxMinColumnNames)
- serializeMinMaxColumnNames(sstableStats.minColumnNames, sstableStats.maxColumnNames, out);
- }
-
- /**
- * deserializes the metadata
- *
- * returns a pair containing the part of the metadata meant to be kept-in memory and the part
- * that should not.
- *
- * @param descriptor the descriptor
- * @return a pair containing data that needs to be in memory and data that is potentially big and is not needed
- * in memory
- * @throws IOException
- */
- public Pair<SSTableMetadata, Set<Integer>> deserialize(Descriptor descriptor) throws IOException
- {
- return deserialize(descriptor, true);
- }
-
- public Pair<SSTableMetadata, Set<Integer>> deserialize(Descriptor descriptor, boolean loadSSTableLevel) throws IOException
- {
- logger.debug("Load metadata for {}", descriptor);
- File statsFile = new File(descriptor.filenameFor(Component.STATS));
- if (!statsFile.exists())
- {
- logger.debug("No sstable stats for {}", descriptor);
- return Pair.create(new SSTableMetadata(), Collections.<Integer>emptySet());
- }
-
- DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(statsFile)));
- try
- {
- return deserialize(in, descriptor, loadSSTableLevel);
- }
- finally
- {
- FileUtils.closeQuietly(in);
- }
- }
- public Pair<SSTableMetadata, Set<Integer>> deserialize(DataInputStream in, Descriptor desc) throws IOException
- {
- return deserialize(in, desc, true);
- }
-
- public Pair<SSTableMetadata, Set<Integer>> deserialize(DataInputStream in, Descriptor desc, boolean loadSSTableLevel) throws IOException
- {
- EstimatedHistogram rowSizes = EstimatedHistogram.serializer.deserialize(in);
- EstimatedHistogram columnCounts = EstimatedHistogram.serializer.deserialize(in);
- ReplayPosition replayPosition = ReplayPosition.serializer.deserialize(in);
- long minTimestamp = in.readLong();
- long maxTimestamp = in.readLong();
- int maxLocalDeletionTime = desc.version.tracksMaxLocalDeletionTime ? in.readInt() : Integer.MAX_VALUE;
- double bloomFilterFPChance = desc.version.hasBloomFilterFPChance ? in.readDouble() : NO_BLOOM_FLITER_FP_CHANCE;
- double compressionRatio = in.readDouble();
- String partitioner = in.readUTF();
- int nbAncestors = in.readInt();
- Set<Integer> ancestors = new HashSet<Integer>(nbAncestors);
- for (int i = 0; i < nbAncestors; i++)
- ancestors.add(in.readInt());
- StreamingHistogram tombstoneHistogram = StreamingHistogram.serializer.deserialize(in);
- int sstableLevel = 0;
-
- if (loadSSTableLevel && in.available() > 0)
- sstableLevel = in.readInt();
-
- List<ByteBuffer> minColumnNames;
- List<ByteBuffer> maxColumnNames;
- if (desc.version.tracksMaxMinColumnNames)
- {
- int colCount = in.readInt();
- minColumnNames = new ArrayList<ByteBuffer>(colCount);
- for (int i = 0; i < colCount; i++)
- {
- minColumnNames.add(ByteBufferUtil.readWithShortLength(in));
- }
- colCount = in.readInt();
- maxColumnNames = new ArrayList<ByteBuffer>(colCount);
- for (int i = 0; i < colCount; i++)
- {
- maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
- }
- }
- else
- {
- minColumnNames = Collections.emptyList();
- maxColumnNames = Collections.emptyList();
- }
- return Pair.create(new SSTableMetadata(rowSizes,
- columnCounts,
- replayPosition,
- minTimestamp,
- maxTimestamp,
- maxLocalDeletionTime,
- bloomFilterFPChance,
- compressionRatio,
- partitioner,
- tombstoneHistogram,
- sstableLevel,
- minColumnNames,
- maxColumnNames), ancestors);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index f82af1f..055f4b6 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -46,9 +46,11 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.ICompactionScanner;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.dht.*;
+import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.compress.CompressedThrottledReader;
import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.metadata.*;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.metrics.RestorableMeter;
import org.apache.cassandra.service.CacheService;
@@ -123,7 +125,7 @@ public class SSTableReader extends SSTable implements Closeable
private final SSTableDeletingTask deletingTask;
// not final since we need to be able to change level on a file.
- private volatile SSTableMetadata sstableMetadata;
+ private volatile StatsMetadata sstableMetadata;
private final AtomicLong keyCacheHit = new AtomicLong(0);
private final AtomicLong keyCacheRequest = new AtomicLong(0);
@@ -173,38 +175,68 @@ public class SSTableReader extends SSTable implements Closeable
return open(desc, componentsFor(desc), metadata, p);
}
+ public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
+ {
+ return open(descriptor, components, metadata, partitioner, true);
+ }
+
public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
{
return open(descriptor, components, metadata, StorageService.getPartitioner(), false);
}
+ /**
+ * Open SSTable reader to be used in batch mode(such as sstableloader).
+ *
+ * @param descriptor
+ * @param components
+ * @param metadata
+ * @param partitioner
+ * @return opened SSTableReader
+ * @throws IOException
+ */
public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
{
- SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner);
+ // Minimum components without which we can't do anything
+ assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
+ assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
+
+ Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
+ EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
+ ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
+ StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
+
+ // Check if sstable is created using same partitioner.
+ // Partitioner can be null, which indicates older version of sstable or no stats available.
+ // In that case, we skip the check.
+ String partitionerName = partitioner.getClass().getCanonicalName();
+ if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
+ {
+ logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
+ descriptor, validationMetadata.partitioner, partitionerName));
+ System.exit(1);
+ }
+
+ logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
SSTableReader sstable = new SSTableReader(descriptor,
components,
metadata,
partitioner,
System.currentTimeMillis(),
- sstableMetadata);
+ statsMetadata);
// special implementation of load to use non-pooled SegmentedFile builders
SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
SegmentedFile.Builder dbuilder = sstable.compression
? new CompressedSegmentedFile.Builder()
: new BufferedSegmentedFile.Builder();
- if (!sstable.loadSummary(ibuilder, dbuilder, sstable.metadata))
+ if (!sstable.loadSummary(ibuilder, dbuilder))
sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
-
sstable.bf = FilterFactory.AlwaysPresent;
- return sstable;
- }
- public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
- {
- return open(descriptor, components, metadata, partitioner, true);
+ return sstable;
}
private static SSTableReader open(Descriptor descriptor,
@@ -213,53 +245,48 @@ public class SSTableReader extends SSTable implements Closeable
IPartitioner partitioner,
boolean validate) throws IOException
{
- long start = System.nanoTime();
- SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner);
+ // Minimum components without which we can't do anything
+ assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
+ assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
+
+ Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
+ EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
+ ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
+ StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
+
+ // Check if sstable is created using same partitioner.
+ // Partitioner can be null, which indicates older version of sstable or no stats available.
+ // In that case, we skip the check.
+ String partitionerName = partitioner.getClass().getCanonicalName();
+ if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
+ {
+ logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
+ descriptor, validationMetadata.partitioner, partitionerName));
+ System.exit(1);
+ }
+ logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
SSTableReader sstable = new SSTableReader(descriptor,
components,
metadata,
partitioner,
System.currentTimeMillis(),
- sstableMetadata);
+ statsMetadata);
- sstable.load();
+ // load index and filter
+ long start = System.nanoTime();
+ sstable.load(validationMetadata);
+ logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
if (validate)
sstable.validate();
- logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-
if (sstable.getKeyCache() != null)
logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
return sstable;
}
- private static SSTableMetadata openMetadata(Descriptor descriptor, Set<Component> components, IPartitioner partitioner) throws IOException
- {
- assert partitioner != null;
- // Minimum components without which we can't do anything
- assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
- assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
-
- logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
-
- SSTableMetadata sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor).left;
-
- // Check if sstable is created using same partitioner.
- // Partitioner can be null, which indicates older version of sstable or no stats available.
- // In that case, we skip the check.
- String partitionerName = partitioner.getClass().getCanonicalName();
- if (sstableMetadata.partitioner != null && !partitionerName.equals(sstableMetadata.partitioner))
- {
- logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
- descriptor, sstableMetadata.partitioner, partitionerName));
- System.exit(1);
- }
- return sstableMetadata;
- }
-
public static void logOpenException(Descriptor descriptor, IOException e)
{
if (e instanceof FileNotFoundException)
@@ -323,7 +350,7 @@ public class SSTableReader extends SSTable implements Closeable
IndexSummary isummary,
IFilter bf,
long maxDataAge,
- SSTableMetadata sstableMetadata)
+ StatsMetadata sstableMetadata)
{
assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
return new SSTableReader(desc,
@@ -343,7 +370,7 @@ public class SSTableReader extends SSTable implements Closeable
CFMetaData metadata,
IPartitioner partitioner,
long maxDataAge,
- SSTableMetadata sstableMetadata)
+ StatsMetadata sstableMetadata)
{
super(desc, components, metadata, partitioner);
this.sstableMetadata = sstableMetadata;
@@ -384,7 +411,7 @@ public class SSTableReader extends SSTable implements Closeable
IndexSummary indexSummary,
IFilter bloomFilter,
long maxDataAge,
- SSTableMetadata sstableMetadata)
+ StatsMetadata sstableMetadata)
{
this(desc, components, metadata, partitioner, maxDataAge, sstableMetadata);
@@ -436,7 +463,7 @@ public class SSTableReader extends SSTable implements Closeable
keyCache = CacheService.instance.keyCache;
}
- private void load() throws IOException
+ private void load(ValidationMetadata validation) throws IOException
{
if (metadata.getBloomFilterFpChance() == 1.0)
{
@@ -444,12 +471,12 @@ public class SSTableReader extends SSTable implements Closeable
load(false, true);
bf = FilterFactory.AlwaysPresent;
}
- else if (!components.contains(Component.FILTER))
+ else if (!components.contains(Component.FILTER) || validation == null)
{
// bf is enabled, but filter component is missing.
load(true, true);
}
- else if (descriptor.version.hasBloomFilterFPChance && sstableMetadata.bloomFilterFPChance != metadata.getBloomFilterFpChance())
+ else if (descriptor.version.hasBloomFilterFPChance && validation.bloomFilterFPChance != metadata.getBloomFilterFpChance())
{
// bf fp chance in sstable metadata and it has changed since compaction.
load(true, true);
@@ -462,7 +489,12 @@ public class SSTableReader extends SSTable implements Closeable
}
}
- void loadBloomFilter() throws IOException
+ /**
+ * Load bloom filter from Filter.db file.
+ *
+ * @throws IOException
+ */
+ private void loadBloomFilter() throws IOException
{
DataInputStream stream = null;
try
@@ -488,7 +520,7 @@ public class SSTableReader extends SSTable implements Closeable
? SegmentedFile.getCompressedBuilder()
: SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
- boolean summaryLoaded = loadSummary(ibuilder, dbuilder, metadata);
+ boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
if (recreateBloomFilter || !summaryLoaded)
buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
@@ -498,6 +530,15 @@ public class SSTableReader extends SSTable implements Closeable
saveSummary(ibuilder, dbuilder);
}
+ /**
+ * Build index summary(and optionally bloom filter) by reading through Index.db file.
+ *
+ * @param recreateBloomFilter true if recreate bloom filter
+ * @param ibuilder
+ * @param dbuilder
+ * @param summaryLoaded true if index summary is already loaded and not need to build again
+ * @throws IOException
+ */
private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
{
// we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
@@ -552,7 +593,17 @@ public class SSTableReader extends SSTable implements Closeable
last = getMinimalKey(last);
}
- public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, CFMetaData metadata)
+ /**
+ * Load index summary from Summary.db file if it exists.
+ *
+ * if loaded index summary has different index interval from current value stored in schema,
+ * then Summary.db file will be deleted and this returns false to rebuild summary.
+ *
+ * @param ibuilder
+ * @param dbuilder
+ * @return true if index summary is loaded successfully from Summary.db file.
+ */
+ public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
{
File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
if (!descriptor.version.offHeapSummaries || !summariesFile.exists())
@@ -562,15 +613,7 @@ public class SSTableReader extends SSTable implements Closeable
try
{
iStream = new DataInputStream(new FileInputStream(summariesFile));
- indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel);
- if (indexSummary.getIndexInterval() != metadata.getIndexInterval())
- {
- iStream.close();
- logger.debug("Cannot read the saved summary for {} because Index Interval changed from {} to {}.",
- toString(), indexSummary.getIndexInterval(), metadata.getIndexInterval());
- FileUtils.deleteWithConfirm(summariesFile);
- return false;
- }
+ indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel, metadata.getIndexInterval());
first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
ibuilder.deserializeBounds(iStream);
@@ -578,9 +621,10 @@ public class SSTableReader extends SSTable implements Closeable
}
catch (IOException e)
{
- logger.debug("Cannot deserialize SSTable Summary: ", e);
+ logger.debug("Cannot deserialize SSTable {} Summary: {}", toString(), e.getMessage());
// corrupted; delete it and fall back to creating a new summary
FileUtils.closeQuietly(iStream);
+ // delete it and fall back to creating a new summary
FileUtils.deleteWithConfirm(summariesFile);
return false;
}
@@ -592,6 +636,12 @@ public class SSTableReader extends SSTable implements Closeable
return true;
}
+ /**
+ * Save index summary to Summary.db file.
+ *
+ * @param ibuilder
+ * @param dbuilder
+ */
public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
{
saveSummary(ibuilder, dbuilder, indexSummary);
@@ -830,7 +880,7 @@ public class SSTableReader extends SSTable implements Closeable
private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges)
{
// use the index to determine a minimal section for each range
- List<Pair<Integer,Integer>> positions = new ArrayList<Pair<Integer,Integer>>();
+ List<Pair<Integer,Integer>> positions = new ArrayList<>();
for (Range<Token> range : Range.normalize(ranges))
{
@@ -864,7 +914,7 @@ public class SSTableReader extends SSTable implements Closeable
if (left > right)
// empty range
continue;
- positions.add(Pair.create(Integer.valueOf(left), Integer.valueOf(right)));
+ positions.add(Pair.create(left, right));
}
return positions;
}
@@ -924,7 +974,7 @@ public class SSTableReader extends SSTable implements Closeable
public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range<Token>> ranges)
{
// use the index to determine a minimal section for each range
- List<Pair<Long,Long>> positions = new ArrayList<Pair<Long,Long>>();
+ List<Pair<Long,Long>> positions = new ArrayList<>();
for (Range<Token> range : Range.normalize(ranges))
{
AbstractBounds<RowPosition> keyRange = range.toRowBounds();
@@ -941,7 +991,7 @@ public class SSTableReader extends SSTable implements Closeable
if (left == right)
// empty range
continue;
- positions.add(Pair.create(Long.valueOf(left), Long.valueOf(right)));
+ positions.add(Pair.create(left, right));
}
return positions;
}
@@ -1481,7 +1531,8 @@ public class SSTableReader extends SSTable implements Closeable
{
try
{
- return SSTableMetadata.serializer.deserialize(descriptor).right;
+ CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION);
+ return compactionMetadata.ancestors;
}
catch (IOException e)
{
@@ -1506,10 +1557,10 @@ public class SSTableReader extends SSTable implements Closeable
*/
public void reloadSSTableMetadata() throws IOException
{
- this.sstableMetadata = SSTableMetadata.serializer.deserialize(descriptor).left;
+ this.sstableMetadata = (StatsMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.STATS);
}
- public SSTableMetadata getSSTableMetadata()
+ public StatsMetadata getSSTableMetadata()
{
return sstableMetadata;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 5b02d37..60bb8d1 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -33,6 +33,10 @@ import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.compress.CompressedSequentialWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -53,7 +57,7 @@ public class SSTableWriter extends SSTable
private final SequentialWriter dataFile;
private DecoratedKey lastWrittenKey;
private FileMark dataMark;
- private final SSTableMetadata.Collector sstableMetadataCollector;
+ private final MetadataCollector sstableMetadataCollector;
public SSTableWriter(String filename, long keyCount)
{
@@ -61,7 +65,7 @@ public class SSTableWriter extends SSTable
keyCount,
Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)),
StorageService.getPartitioner(),
- SSTableMetadata.createCollector(Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)).comparator));
+ new MetadataCollector(Schema.instance.getCFMetaData(Descriptor.fromFilename(filename)).comparator));
}
private static Set<Component> components(CFMetaData metadata)
@@ -93,7 +97,7 @@ public class SSTableWriter extends SSTable
long keyCount,
CFMetaData metadata,
IPartitioner<?> partitioner,
- SSTableMetadata.Collector sstableMetadataCollector)
+ MetadataCollector sstableMetadataCollector)
{
super(Descriptor.fromFilename(filename),
components(metadata),
@@ -308,9 +312,9 @@ public class SSTableWriter extends SSTable
public SSTableReader closeAndOpenReader(long maxDataAge)
{
- Pair<Descriptor, SSTableMetadata> p = close();
+ Pair<Descriptor, StatsMetadata> p = close();
Descriptor newdesc = p.left;
- SSTableMetadata sstableMetadata = p.right;
+ StatsMetadata sstableMetadata = p.right;
// finalize in-memory state for the reader
SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(Component.PRIMARY_INDEX));
@@ -335,36 +339,40 @@ public class SSTableWriter extends SSTable
}
// Close the writer and return the descriptor to the new sstable and it's metadata
- public Pair<Descriptor, SSTableMetadata> close()
+ public Pair<Descriptor, StatsMetadata> close()
{
// index and filter
iwriter.close();
// main data, close will truncate if necessary
dataFile.close();
// write sstable statistics
- SSTableMetadata sstableMetadata = sstableMetadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+ Map<MetadataType, MetadataComponent> metadataComponents = sstableMetadataCollector.finalizeMetadata(
+ partitioner.getClass().getCanonicalName(),
metadata.getBloomFilterFpChance());
- writeMetadata(descriptor, sstableMetadata, sstableMetadataCollector.ancestors);
+ writeMetadata(descriptor, metadataComponents);
// save the table of components
SSTable.appendTOC(descriptor, components);
// remove the 'tmp' marker from all components
- return Pair.create(rename(descriptor, components), sstableMetadata);
+ return Pair.create(rename(descriptor, components), (StatsMetadata) metadataComponents.get(MetadataType.STATS));
}
- private static void writeMetadata(Descriptor desc, SSTableMetadata sstableMetadata, Set<Integer> ancestors)
+ private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
{
SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)), true);
try
{
- SSTableMetadata.serializer.serialize(sstableMetadata, ancestors, out.stream);
+ desc.getMetadataSerializer().serialize(components, out.stream);
}
catch (IOException e)
{
throw new FSWriteError(e, out.getPath());
}
- out.close();
+ finally
+ {
+ out.close();
+ }
}
static Descriptor rename(Descriptor tmpdesc, Set<Component> components)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
new file mode 100644
index 0000000..fd0e626
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/CompactionMetadata.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.sstable.Descriptor;
+
+/**
+ * Compaction related SSTable metadata.
+ *
+ * Only loaded for <b>compacting</b> SSTables at the time of compaction.
+ */
+public class CompactionMetadata extends MetadataComponent
+{
+ public static final IMetadataComponentSerializer serializer = new CompactionMetadataSerializer();
+
+ public final Set<Integer> ancestors;
+
+ public CompactionMetadata(Set<Integer> ancestors)
+ {
+ this.ancestors = ancestors;
+ }
+
+ public MetadataType getType()
+ {
+ return MetadataType.COMPACTION;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ CompactionMetadata that = (CompactionMetadata) o;
+ return ancestors == null ? that.ancestors == null : ancestors.equals(that.ancestors);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return ancestors != null ? ancestors.hashCode() : 0;
+ }
+
+ public static class CompactionMetadataSerializer implements IMetadataComponentSerializer<CompactionMetadata>
+ {
+ public int serializedSize(CompactionMetadata component) throws IOException
+ {
+ int size = 0;
+ size += TypeSizes.NATIVE.sizeof(component.ancestors.size());
+ for (int g : component.ancestors)
+ size += TypeSizes.NATIVE.sizeof(g);
+ return size;
+ }
+
+ public void serialize(CompactionMetadata component, DataOutput out) throws IOException
+ {
+ out.writeInt(component.ancestors.size());
+ for (int g : component.ancestors)
+ out.writeInt(g);
+ }
+
+ public CompactionMetadata deserialize(Descriptor.Version version, DataInput in) throws IOException
+ {
+ int nbAncestors = in.readInt();
+ Set<Integer> ancestors = new HashSet<>(nbAncestors);
+ for (int i = 0; i < nbAncestors; i++)
+ ancestors.add(in.readInt());
+ return new CompactionMetadata(ancestors);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
new file mode 100644
index 0000000..53ca138
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataComponentSerializer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.cassandra.io.sstable.Descriptor;
+
+/**
+ * Metadata component serializer
+ */
+public interface IMetadataComponentSerializer<T extends MetadataComponent>
+{
+ /**
+ * Calculate and return serialized size.
+ *
+ * @param component MetadataComponent to calculate serialized size
+ * @return serialized size of this component
+ * @throws IOException
+ */
+ int serializedSize(T component) throws IOException;
+
+ /**
+ * Serialize metadata component to given output.
+ *
+ * @param component MetadataComponent to serialize
+ * @param out serialize destination
+ * @throws IOException
+ */
+ void serialize(T component, DataOutput out) throws IOException;
+
+ /**
+ * Deserialize metadata component from given input.
+ *
+ * @param version serialize version
+ * @param in deserialize source
+ * @return Deserialized component
+ * @throws IOException
+ */
+ T deserialize(Descriptor.Version version, DataInput in) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/74bf5aa1/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
new file mode 100644
index 0000000..0875e5d
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.metadata;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Map;
+
+import org.apache.cassandra.io.sstable.Descriptor;
+
+/**
+ * Interface for SSTable metadata serializer
+ */
+public interface IMetadataSerializer
+{
+ /**
+ * Serialize given metadata components
+ *
+ * @param components Metadata components to serialize
+ * @throws IOException
+ */
+ void serialize(Map<MetadataType, MetadataComponent> components, DataOutput out) throws IOException;
+
+ /**
+ * Deserialize specified metadata components from given descriptor.
+ *
+ * @param descriptor SSTable descriptor
+ * @return Deserialized metadata components, in deserialized order.
+ * @throws IOException
+ */
+ Map<MetadataType, MetadataComponent> deserialize(Descriptor descriptor, EnumSet<MetadataType> types) throws IOException;
+
+ /**
+ * Deserialized only metadata component specified from given descriptor.
+ *
+ * @param descriptor SSTable descriptor
+ * @param type Metadata component type to deserialize
+ * @return Deserialized metadata component. Can be null if specified type does not exist.
+ * @throws IOException
+ */
+ MetadataComponent deserialize(Descriptor descriptor, MetadataType type) throws IOException;
+
+ /**
+ * Mutate SSTable level
+ *
+ * @param descriptor SSTable descriptor
+ * @param newLevel new SSTable level
+ * @throws IOException
+ */
+ void mutateLevel(Descriptor descriptor, int newLevel) throws IOException;
+}